Publish Event Sequence
Create and publish multiple events for an aggregate
01
Sample Code
from psp.platform.contracts import EventEnvelope
from psp.platform.eventbus import InMemoryEventBus
from uuid import uuid4
# Set up
event_bus = InMemoryEventBus()
correlation_id = str(uuid4()) # Links all events in this flow
task_id = str(uuid4())
# Event 1: TaskCreated (sequence=1)
event_bus.publish(EventEnvelope(
payload={"type": "TaskCreated", "task_id": task_id, "title": "Demo"},
aggregate_id=task_id,
aggregate_type="Task",
sequence=1,
correlation_id=correlation_id,
))
# Event 2: TaskUpdated (sequence=2)
event_bus.publish(EventEnvelope(
payload={"type": "TaskUpdated", "task_id": task_id, "title": "Updated"},
aggregate_id=task_id,
aggregate_type="Task",
sequence=2,
correlation_id=correlation_id,
))
# Event 3: TaskCompleted (sequence=3)
event_bus.publish(EventEnvelope(
payload={"type": "TaskCompleted", "task_id": task_id},
aggregate_id=task_id,
aggregate_type="Task",
sequence=3,
correlation_id=correlation_id,
))
# All events share correlation_id for tracing
assert all(e.correlation_id == correlation_id for e in event_bus.events)