event-envelope
Domain event wrapper with aggregate context
EventEnvelope wraps domain event payloads with metadata. Extends base Envelope with aggregate_id, aggregate_type, and sequence for event sourcing. Use for domain events published via EventBus. Immutable (frozen dataclass) for safe sharing.
0
Ports
0
Schemas
3
Hooks
2
Events
01
Ports
Adapters Provided
No adapters provided
02
Schemas
Defines
No schemas defined
Uses
No external schemas used
03
Hooks
- before_create
- after_create
- before_publish
04
Events
- EventEnvelopeCreated v1
- EventEnvelopePublished v1
05
Stories
06
Examples
from psp.platform.contracts import EventEnvelope
event = EventEnvelope(
payload={
"type": "TaskCreated",
"task_id": str(task.id),
"title": task.title,
"owner_id": str(task.owner_id),
},
aggregate_id=str(task.id),
aggregate_type="Task",
correlation_id=correlation_id,
actor_id=str(actor_id),
)
event_bus.publish(event)
Wrap event payload with aggregate context.
# Each event increments the sequence
event = EventEnvelope(
payload={"type": "TaskUpdated", "changes": {"title": "New title"}},
aggregate_id=str(task.id),
aggregate_type="Task",
sequence=task.version, # Current version before change
)
# Event store can reject if sequence doesn't match
# This prevents concurrent modifications
await event_store.append(event, expected_sequence=task.version)
Track aggregate version for conflict detection.
def rebuild_task(events: list[EventEnvelope]) -> Task:
task = None
for event in events:
payload = event.payload
event_type = payload["type"]
if event_type == "TaskCreated":
task = Task(
id=UUID(event.aggregate_id),
title=payload["title"],
version=event.sequence,
)
elif event_type == "TaskCompleted":
task.status = "completed"
task.version = event.sequence
return task
Event sourcing pattern using EventEnvelope.
# Get all events for a task
task_events = [
e for e in event_bus.events
if e.aggregate_type == "Task" and e.aggregate_id == str(task_id)
]
# Get events across aggregate types
all_user_events = [
e for e in event_bus.events
if e.actor_id == str(user_id)
]
# Group by aggregate for replay
from itertools import groupby
by_aggregate = groupby(events, key=lambda e: (e.aggregate_type, e.aggregate_id))
Query events for a specific entity.
class CompleteTaskUseCase:
def __init__(self, repo: TaskRepository, bus: EventBus) -> None:
self._repo = repo
self._bus = bus
async def execute(
self, task_id: UUID, actor_id: UUID, correlation_id: str
) -> None:
task = await self._repo.get(task_id)
task.complete()
await self._repo.update(task)
self._bus.publish(EventEnvelope(
payload={
"type": "TaskCompleted",
"task_id": str(task_id),
"completed_at": task.completed_at.isoformat(),
},
aggregate_id=str(task_id),
aggregate_type="Task",
sequence=task.version,
correlation_id=correlation_id, # Links to originating request
actor_id=str(actor_id),
))
Standard pattern linking command to events.
API Reference
This component mounts routes under /v1/event-envelope.
View OpenAPI specification