EventEnvelope wraps domain event payloads with metadata. Extends base Envelope with aggregate_id, aggregate_type, and sequence for event sourcing. Use for domain events published via EventBus. Immutable (frozen dataclass) for safe sharing.

0
Ports
0
Schemas
3
Hooks
2
Events
01

Ports

Required

No required ports

Optional
  • Clock optional
    Provide occurred_at timestamp for events
  • IdGenerator optional
    Generate unique message_id for envelopes
Adapters Provided

No adapters provided

02

Schemas

Defines

No schemas defined

Uses

No external schemas used

03

Hooks

  • before_create
    Before an EventEnvelope is created
    outputs: Patch
  • after_create
    After an EventEnvelope is created
    outputs: SideEffectRequest
  • before_publish
    Before an EventEnvelope is published to EventBus
    outputs: Veto, Patch
04

Events

  • EventEnvelopeCreated v1
    After an EventEnvelope is instantiated
    payload: {message_id, aggregate_type, aggregate_id}
  • EventEnvelopePublished v1
    After an EventEnvelope is published to EventBus
    payload: {message_id, event_type, aggregate_id}
05

Stories

06

Examples

Create a domain event
from psp.platform.contracts import EventEnvelope

event = EventEnvelope(
    payload={
        "type": "TaskCreated",
        "task_id": str(task.id),
        "title": task.title,
        "owner_id": str(task.owner_id),
    },
    aggregate_id=str(task.id),
    aggregate_type="Task",
    correlation_id=correlation_id,
    actor_id=str(actor_id),
)

event_bus.publish(event)

Wrap event payload with aggregate context.

Use sequence for optimistic concurrency
# Each event increments the sequence
event = EventEnvelope(
    payload={"type": "TaskUpdated", "changes": {"title": "New title"}},
    aggregate_id=str(task.id),
    aggregate_type="Task",
    sequence=task.version,  # Current version before change
)

# Event store can reject if sequence doesn't match
# This prevents concurrent modifications
await event_store.append(event, expected_sequence=task.version)

Track aggregate version for conflict detection.

Reconstruct aggregate from events
def rebuild_task(events: list[EventEnvelope]) -> Task:
    task = None
    for event in events:
        payload = event.payload
        event_type = payload["type"]

        if event_type == "TaskCreated":
            task = Task(
                id=UUID(event.aggregate_id),
                title=payload["title"],
                version=event.sequence,
            )
        elif event_type == "TaskCompleted":
            task.status = "completed"
            task.version = event.sequence

    return task

Event sourcing pattern using EventEnvelope.

Filter events by aggregate
# Get all events for a task
task_events = [
    e for e in event_bus.events
    if e.aggregate_type == "Task" and e.aggregate_id == str(task_id)
]

# Get events across aggregate types
all_user_events = [
    e for e in event_bus.events
    if e.actor_id == str(user_id)
]

# Group by aggregate for replay
from itertools import groupby
by_aggregate = groupby(events, key=lambda e: (e.aggregate_type, e.aggregate_id))

Query events for a specific entity.

Publish from use case with correlation
class CompleteTaskUseCase:
    def __init__(self, repo: TaskRepository, bus: EventBus) -> None:
        self._repo = repo
        self._bus = bus

    async def execute(
        self, task_id: UUID, actor_id: UUID, correlation_id: str
    ) -> None:
        task = await self._repo.get(task_id)
        task.complete()
        await self._repo.update(task)

        self._bus.publish(EventEnvelope(
            payload={
                "type": "TaskCompleted",
                "task_id": str(task_id),
                "completed_at": task.completed_at.isoformat(),
            },
            aggregate_id=str(task_id),
            aggregate_type="Task",
            sequence=task.version,
            correlation_id=correlation_id,  # Links to originating request
            actor_id=str(actor_id),
        ))

Standard pattern linking command to events.

API Reference

This component mounts routes under /v1/event-envelope. View OpenAPI specification