01

Sample Code

publish-event-sequence.py
from psp.platform.contracts import EventEnvelope
from psp.platform.eventbus import InMemoryEventBus
from uuid import uuid4

# Set up
event_bus = InMemoryEventBus()
correlation_id = str(uuid4())  # Links all events in this flow
task_id = str(uuid4())

# Event 1: TaskCreated (sequence=1)
event_bus.publish(EventEnvelope(
    payload={"type": "TaskCreated", "task_id": task_id, "title": "Demo"},
    aggregate_id=task_id,
    aggregate_type="Task",
    sequence=1,
    correlation_id=correlation_id,
))

# Event 2: TaskUpdated (sequence=2)
event_bus.publish(EventEnvelope(
    payload={"type": "TaskUpdated", "task_id": task_id, "title": "Updated"},
    aggregate_id=task_id,
    aggregate_type="Task",
    sequence=2,
    correlation_id=correlation_id,
))

# Event 3: TaskCompleted (sequence=3)
event_bus.publish(EventEnvelope(
    payload={"type": "TaskCompleted", "task_id": task_id},
    aggregate_id=task_id,
    aggregate_type="Task",
    sequence=3,
    correlation_id=correlation_id,
))

# All events share correlation_id for tracing
assert all(e.correlation_id == correlation_id for e in event_bus.events)