What Events Do

Events notify other components that something happened. The publisher doesn't wait for subscribers—it's fire-and-forget.

Event Flow
%%{init: {"sequence": {"useMaxWidth": false}}}%% sequenceDiagram participant UC as Use Case participant EB as EventBus participant S1 as Subscriber A participant S2 as Subscriber B UC->>EB: publish(TaskCompleted) UC->>UC: return success Note over EB: async delivery EB->>S1: notify EB->>S2: notify

The use case returns immediately. Subscribers process the event later—possibly in a different thread or even a different process.

Defining an Event

Events are frozen dataclasses with primitive data:

todos/domain/events.py
@dataclass(frozen=True)
class TaskCompleted:
    """Emitted when a task is marked complete."""
    task_id: UUID
    owner_id: UUID
    completed_at: datetime
    title: str  # denormalized for subscribers

@dataclass(frozen=True)
class TaskCreated:
    task_id: UUID
    owner_id: UUID
    title: str
    container_id: UUID
Denormalization

Include data subscribers need. If they need the task title, include it—don't make them query back. This keeps components decoupled.

Event Envelope

Events are wrapped in an envelope that adds metadata:

platform/contracts/envelope.py
@dataclass(frozen=True)
class EventEnvelope[T]:
    id: UUID               # Unique event ID (for idempotency)
    timestamp: datetime    # When it was published
    actor_id: UUID         # Who triggered the action
    correlation_id: UUID   # Request tracing
    event_type: str        # "todos.TaskCompleted"
    payload: T             # The actual event

The envelope is immutable—events are facts.

Publishing Events

Use cases publish events after committing state changes:

todos/application/use_cases/complete_task.py
class CompleteTask:
    def __init__(self, repo: TaskRepo, event_bus: EventBus, clock: Clock):
        self.repo = repo
        self.event_bus = event_bus
        self.clock = clock

    def execute(self, task_id: UUID, actor_id: UUID):
        task = self.repo.get(task_id)
        task.complete()
        self.repo.save(task)  # commit first

        # Then publish
        self.event_bus.publish(TaskCompleted(
            task_id=task.id,
            owner_id=task.owner_id,
            completed_at=self.clock.now(),
            title=task.title
        ))
Order Matters

Always commit state before publishing. If the commit fails, no event should be sent.

Subscribing to Events

Components register subscribers in their module:

budget/module.py
class BudgetModule(Module):
    def register_subscribers(self, event_bus: EventBus):
        event_bus.subscribe(TaskCompleted, self.on_task_completed)

    def on_task_completed(self, envelope: EventEnvelope[TaskCompleted]):
        event = envelope.payload

        # Check if this task has a cost
        task_cost = self.cost_repo.get_by_task(event.task_id)
        if not task_cost:
            return

        # Debit the budget
        self.ledger.debit(
            owner_id=event.owner_id,
            token_type_id=task_cost.token_type_id,
            amount=task_cost.amount,
            reference_id=event.task_id,
            reason=f"Task completed: {event.title}"
        )

Idempotency

Events can be delivered multiple times (at-least-once delivery). Subscribers must be idempotent:

subscriber.py
def on_task_completed(self, envelope: EventEnvelope):
    # Use envelope.id for idempotency
    key = f"budget:task_completed:{envelope.id}"

    if self.idempotency.exists(key):
        return  # Already processed this exact event

    # Process the event...
    self.ledger.debit(...)

    # Mark as processed
    self.idempotency.mark(key, ttl=timedelta(days=7))

Fan-Out Pattern

Multiple components can subscribe to the same event:

subscriptions.py
# All subscribe to TaskCompleted
event_bus.subscribe(TaskCompleted, budget.on_task_completed)     # debit budget
event_bus.subscribe(TaskCompleted, recurrence.on_task_completed) # create next
event_bus.subscribe(TaskCompleted, audit.on_task_completed)      # log activity
event_bus.subscribe(TaskCompleted, search.on_task_completed)     # update index

The publisher doesn't know (or care) how many subscribers exist.

Testing Events

Use InMemoryEventBus to capture and inspect events:

test_complete_task.py
def test_emits_event_on_completion():
    event_bus = InMemoryEventBus()
    use_case = CompleteTask(repo, event_bus, clock)

    use_case.execute(task_id, actor_id)

    assert len(event_bus.published) == 1
    event = event_bus.published[0]
    assert isinstance(event, TaskCompleted)
    assert event.task_id == task_id
Rules
  • Events MUST be immutable (frozen=True)
  • Events MUST contain only primitive data (no domain objects)
  • Publish AFTER committing state changes
  • Subscribers MUST be idempotent
  • Include denormalized data subscribers need
Pitfalls
  • Don't use events for validation — Events are eventual. Use hooks to block invalid operations.
  • Don't expect order — Events may arrive out of order. Design subscribers to handle this.
  • Don't query back to the publisher — Include needed data in the event payload.