Events
Asynchronous messaging for eventual consistency
What Events Do
Events notify other components that something happened. The publisher doesn't wait for subscribers—it's fire-and-forget.
The use case returns immediately. Subscribers process the event later—possibly in a different thread or even a different process.
Defining an Event
Events are frozen dataclasses with primitive data:
@dataclass(frozen=True) class TaskCompleted: """Emitted when a task is marked complete.""" task_id: UUID owner_id: UUID completed_at: datetime title: str # denormalized for subscribers @dataclass(frozen=True) class TaskCreated: task_id: UUID owner_id: UUID title: str container_id: UUID
Include data subscribers need. If they need the task title, include it—don't make them query back. This keeps components decoupled.
Event Envelope
Events are wrapped in an envelope that adds metadata:
@dataclass(frozen=True) class EventEnvelope[T]: id: UUID # Unique event ID (for idempotency) timestamp: datetime # When it was published actor_id: UUID # Who triggered the action correlation_id: UUID # Request tracing event_type: str # "todos.TaskCompleted" payload: T # The actual event
The envelope is immutable—events are facts.
Publishing Events
Use cases publish events after committing state changes:
class CompleteTask: def __init__(self, repo: TaskRepo, event_bus: EventBus, clock: Clock): self.repo = repo self.event_bus = event_bus self.clock = clock def execute(self, task_id: UUID, actor_id: UUID): task = self.repo.get(task_id) task.complete() self.repo.save(task) # commit first # Then publish self.event_bus.publish(TaskCompleted( task_id=task.id, owner_id=task.owner_id, completed_at=self.clock.now(), title=task.title ))
Always commit state before publishing. If the commit fails, no event should be sent.
Subscribing to Events
Components register subscribers in their module:
class BudgetModule(Module): def register_subscribers(self, event_bus: EventBus): event_bus.subscribe(TaskCompleted, self.on_task_completed) def on_task_completed(self, envelope: EventEnvelope[TaskCompleted]): event = envelope.payload # Check if this task has a cost task_cost = self.cost_repo.get_by_task(event.task_id) if not task_cost: return # Debit the budget self.ledger.debit( owner_id=event.owner_id, token_type_id=task_cost.token_type_id, amount=task_cost.amount, reference_id=event.task_id, reason=f"Task completed: {event.title}" )
Idempotency
Events can be delivered multiple times (at-least-once delivery). Subscribers must be idempotent:
def on_task_completed(self, envelope: EventEnvelope): # Use envelope.id for idempotency key = f"budget:task_completed:{envelope.id}" if self.idempotency.exists(key): return # Already processed this exact event # Process the event... self.ledger.debit(...) # Mark as processed self.idempotency.mark(key, ttl=timedelta(days=7))
Fan-Out Pattern
Multiple components can subscribe to the same event:
# All subscribe to TaskCompleted event_bus.subscribe(TaskCompleted, budget.on_task_completed) # debit budget event_bus.subscribe(TaskCompleted, recurrence.on_task_completed) # create next event_bus.subscribe(TaskCompleted, audit.on_task_completed) # log activity event_bus.subscribe(TaskCompleted, search.on_task_completed) # update index
The publisher doesn't know (or care) how many subscribers exist.
Testing Events
Use InMemoryEventBus to capture and inspect events:
def test_emits_event_on_completion(): event_bus = InMemoryEventBus() use_case = CompleteTask(repo, event_bus, clock) use_case.execute(task_id, actor_id) assert len(event_bus.published) == 1 event = event_bus.published[0] assert isinstance(event, TaskCompleted) assert event.task_id == task_id
- Events MUST be immutable (
frozen=True) - Events MUST contain only primitive data (no domain objects)
- Publish AFTER committing state changes
- Subscribers MUST be idempotent
- Include denormalized data subscribers need
- Don't use events for validation — Events are eventual. Use hooks to block invalid operations.
- Don't expect order — Events may arrive out of order. Design subscribers to handle this.
- Don't query back to the publisher — Include needed data in the event payload.