EventBus is a platform port for asynchronous communication between components. Publishers emit events without knowing subscribers. Subscribers register handlers for specific event types. Events are wrapped in EventEnvelope with correlation ID, timestamps, and aggregate context. Use '*' to subscribe to all events.

0
Ports
0
Schemas
4
Hooks
3
Events
01

Ports

Required

No required ports

Optional
  • Clock optional
    Timestamp events on publish
  • ActivityLogRepo optional
    Audit event publishing and delivery
Adapters Provided
  • InMemoryEventBus
    implements EventBus
    In-memory event bus for testing
02

Schemas

Defines

No schemas defined

Uses

No external schemas used

03

Hooks

  • before_publish
    Before an event is published to subscribers
    outputs: Veto, Patch
  • after_publish
    After an event is published (all handlers called)
    outputs: SideEffectRequest
  • before_deliver
    Before delivering an event to a specific handler
    outputs: Veto
  • on_delivery_error
    When a handler throws an exception
    outputs: SideEffectRequest
04

Events

  • SubscriberAdded v1
    After a handler subscribes to an event type
    payload: {event_type, handler_name}
  • SubscriberRemoved v1
    After a handler unsubscribes
    payload: {event_type, handler_name}
  • EventDeliveryFailed v1
    When a handler raises an exception
    payload: {event_type, handler_name, error}
05

Stories

06

Examples

Publish a domain event
from psp.platform.contracts import EventEnvelope

# Create event envelope
event = EventEnvelope(
    payload={"type": "TaskCreated", "task_id": str(task.id), "title": task.title},
    aggregate_id=str(task.id),
    aggregate_type="Task",
    correlation_id=correlation_id,
)

# Publish to all subscribers
event_bus.publish(event)

Wrap event payload in EventEnvelope and publish.

Subscribe to specific events
def on_task_created(event: EventEnvelope) -> None:
    task_id = event.payload["task_id"]
    # Send notification, update search index, etc.
    notification_service.notify(f"New task: {task_id}")

# Subscribe to TaskCreated events
event_bus.subscribe("TaskCreated", on_task_created)

# Subscribe to multiple event types
event_bus.subscribe("TaskCompleted", on_task_completed)
event_bus.subscribe("TaskDeleted", on_task_deleted)

Register handlers for event types.

Subscribe to all events
def audit_logger(event: EventEnvelope) -> None:
    print(f"[{event.occurred_at}] {event.payload.get('type')}")
    print(f"  correlation_id: {event.correlation_id}")
    print(f"  aggregate: {event.aggregate_type}#{event.aggregate_id}")

# Subscribe to all events with wildcard
event_bus.subscribe("*", audit_logger)

Use wildcard for audit logging or debugging.

Test with InMemoryEventBus
from psp.platform.eventbus import InMemoryEventBus

def test_task_creation_publishes_event():
    event_bus = InMemoryEventBus()
    use_case = CreateTaskUseCase(repo=repo, event_bus=event_bus)

    use_case.execute(CreateTaskInput(title="Test"))

    # Inspect published events
    assert len(event_bus.events) == 1
    event = event_bus.events[0]
    assert event.payload["type"] == "TaskCreated"

    # Or filter by type
    created = event_bus.get_events_of_type("TaskCreated")
    assert len(created) == 1

Inspect published events in tests.

Publish from use case
class CompleteTaskUseCase:
    def __init__(self, repo: TaskRepository, bus: EventBus) -> None:
        self._repo = repo
        self._bus = bus

    async def execute(self, task_id: UUID, correlation_id: str) -> None:
        task = await self._repo.get(task_id)
        task.complete()
        await self._repo.update(task)

        # Emit domain event
        self._bus.publish(EventEnvelope(
            payload={"type": "TaskCompleted", "task_id": str(task_id)},
            aggregate_id=str(task_id),
            aggregate_type="Task",
            correlation_id=correlation_id,
        ))

Standard pattern for domain event emission.

API Reference

This component mounts routes under /v1/eventbus. View OpenAPI specification