eventbus
Publish/subscribe messaging for domain events
EventBus is a platform port for asynchronous communication between components. Publishers emit events without knowing subscribers. Subscribers register handlers for specific event types. Events are wrapped in EventEnvelope with correlation ID, timestamps, and aggregate context. Use '*' to subscribe to all events.
0
Ports
0
Schemas
4
Hooks
3
Events
01
Ports
Adapters Provided
- InMemoryEventBus
02
Schemas
Defines
No schemas defined
Uses
No external schemas used
03
Hooks
- before_publish
- after_publish
- before_deliver
- on_delivery_error
04
Events
- SubscriberAdded v1
- SubscriberRemoved v1
- EventDeliveryFailed v1
05
Stories
06
Examples
from psp.platform.contracts import EventEnvelope
# Create event envelope
event = EventEnvelope(
payload={"type": "TaskCreated", "task_id": str(task.id), "title": task.title},
aggregate_id=str(task.id),
aggregate_type="Task",
correlation_id=correlation_id,
)
# Publish to all subscribers
event_bus.publish(event)
Wrap event payload in EventEnvelope and publish.
def on_task_created(event: EventEnvelope) -> None:
task_id = event.payload["task_id"]
# Send notification, update search index, etc.
notification_service.notify(f"New task: {task_id}")
# Subscribe to TaskCreated events
event_bus.subscribe("TaskCreated", on_task_created)
# Subscribe to multiple event types
event_bus.subscribe("TaskCompleted", on_task_completed)
event_bus.subscribe("TaskDeleted", on_task_deleted)
Register handlers for event types.
def audit_logger(event: EventEnvelope) -> None:
print(f"[{event.occurred_at}] {event.payload.get('type')}")
print(f" correlation_id: {event.correlation_id}")
print(f" aggregate: {event.aggregate_type}#{event.aggregate_id}")
# Subscribe to all events with wildcard
event_bus.subscribe("*", audit_logger)
Use wildcard for audit logging or debugging.
from psp.platform.eventbus import InMemoryEventBus
def test_task_creation_publishes_event():
event_bus = InMemoryEventBus()
use_case = CreateTaskUseCase(repo=repo, event_bus=event_bus)
use_case.execute(CreateTaskInput(title="Test"))
# Inspect published events
assert len(event_bus.events) == 1
event = event_bus.events[0]
assert event.payload["type"] == "TaskCreated"
# Or filter by type
created = event_bus.get_events_of_type("TaskCreated")
assert len(created) == 1
Inspect published events in tests.
class CompleteTaskUseCase:
def __init__(self, repo: TaskRepository, bus: EventBus) -> None:
self._repo = repo
self._bus = bus
async def execute(self, task_id: UUID, correlation_id: str) -> None:
task = await self._repo.get(task_id)
task.complete()
await self._repo.update(task)
# Emit domain event
self._bus.publish(EventEnvelope(
payload={"type": "TaskCompleted", "task_id": str(task_id)},
aggregate_id=str(task_id),
aggregate_type="Task",
correlation_id=correlation_id,
))
Standard pattern for domain event emission.
API Reference
This component mounts routes under /v1/eventbus.
View OpenAPI specification