Hooks
Synchronous interception for veto, patch, and observe patterns
What Hooks Do
Hooks let components intercept operations in other components synchronously. The calling code waits for all hooks to complete before proceeding.
Hook Responses
Hooks return one of three response types:
Permit the operation to continue. Default if hook doesn't return anything.
Block the operation with a reason. Any veto stops the operation.
Modify the input data before processing. Multiple patches are merged.
Defining a Hook Point
The component that owns the operation defines hook points:
# Hook point definitions for Todos component HOOK_BEFORE_COMPLETE_TASK = "todos.before_complete_task" HOOK_AFTER_COMPLETE_TASK = "todos.after_complete_task" HOOK_BEFORE_CREATE_TASK = "todos.before_create_task"
class CompleteTask: def execute(self, task_id: UUID, actor_id: UUID): task = self.repo.get(task_id) # Before hook: can veto or patch result = self.hooks.dispatch(HOOK_BEFORE_COMPLETE_TASK, { "task_id": task.id, "owner_id": task.owner_id, "actor_id": actor_id, }) if result.vetoed: raise OperationVetoed(result.reason) task.complete() self.repo.save(task) # After hook: observe only (no veto) self.hooks.dispatch(HOOK_AFTER_COMPLETE_TASK, { "task_id": task.id, "owner_id": task.owner_id, })
Registering a Hook
Other components register handlers for hook points:
class BudgetModule(Module): def register_hooks(self, dispatcher: HookDispatcher): dispatcher.register( "todos.before_complete_task", self.budget_check_hook ) def budget_check_hook(self, context: dict) -> HookResponse: owner_id = context["owner_id"] task_cost = self.get_task_cost(context["task_id"]) if task_cost and self.is_over_budget(owner_id): return Veto("Cannot complete: over budget this week") return Allow()
Patch Example
Patches modify input before the operation runs:
def auto_tag_hook(self, context: dict) -> HookResponse: """Auto-add 'work' tag if task title contains project code.""" title = context.get("title", "") if "[PROJ-" in title: existing_tags = context.get("tags", []) return Patch({"tags": existing_tags + ["work"]}) return Allow()
The use case applies patches to its input:
def execute(self, input_data: dict): result = self.hooks.dispatch(HOOK_BEFORE_CREATE_TASK, input_data) if result.vetoed: raise OperationVetoed(result.reason) # Apply patches from hooks final_data = {**input_data, **result.patches} task = Task(**final_data) self.repo.save(task)
Hook Ordering
Hooks run in registration order. If order matters, register with priority:
# Lower priority runs first dispatcher.register("todos.before_complete_task", validation_hook, priority=10) dispatcher.register("todos.before_complete_task", budget_hook, priority=20) dispatcher.register("todos.before_complete_task", notification_hook, priority=100)
Error Handling
If a hook raises an exception, the dispatcher catches it and treats it as a veto:
def dispatch(self, hook_name: str, context: dict) -> DispatchResult: patches = {} for handler in self.handlers[hook_name]: try: response = handler(context) if isinstance(response, Veto): return DispatchResult(vetoed=True, reason=response.reason) if isinstance(response, Patch): patches.update(response.data) except Exception as e: return DispatchResult(vetoed=True, reason=str(e)) return DispatchResult(vetoed=False, patches=patches)
Use hooks when you need to block or modify an operation. Use events when you just need to react after the fact. Hooks are synchronous; events are asynchronous.
- Don't do slow work in hooks — Hooks block the request. Send an email? Use an event.
- Don't call external APIs — Network failures become operation failures.
- Don't mutate state in before hooks — The operation might still fail after your hook.
- Don't create circular dependencies — If A hooks into B and B hooks into A, you have a problem.
- Hook handlers MUST be fast (< 100ms)
- Hook handlers MUST NOT have external dependencies
- Hook contexts MUST contain only primitive data
- After hooks SHOULD NOT return Veto (operation already complete)