envelope
Message envelopes for commands, queries, and events
Envelopes wrap message payloads with standard metadata. CommandEnvelope adds idempotency_key for duplicate prevention. QueryEnvelope wraps read operations. EventEnvelope adds aggregate context for domain events. All envelopes include message_id, correlation_id, schema_version, occurred_at, and actor_id.
0
Ports
4
Schemas
3
Hooks
2
Events
01
Ports
Adapters Provided
No adapters provided
02
Schemas
Uses
No external schemas used
03
Hooks
- before_create_command
- before_create_query
- after_create
04
Events
- CommandEnvelopeCreated v1
- QueryEnvelopeCreated v1
05
Examples
from psp.platform.contracts import CommandEnvelope
# Create command envelope with payload
cmd = CommandEnvelope(
payload={"type": "CreateTask", "title": "Buy groceries"},
actor_id="user-123",
correlation_id="req-abc",
idempotency_key="create-task-xyz",
)
# Access envelope metadata
print(cmd.message_id) # Auto-generated UUID
print(cmd.occurred_at) # Auto-generated timestamp
print(cmd.schema_version) # "1.0"
print(cmd.idempotency_key) # "create-task-xyz"
Add metadata and idempotency to command payloads.
# HTTP handler extracts or generates correlation_id
correlation_id = request.headers.get("X-Correlation-ID") or str(uuid4())
# Wrap incoming command
cmd = CommandEnvelope(
payload=input_data,
correlation_id=correlation_id,
actor_id=current_user.id,
)
# Use same correlation_id for events
event = EventEnvelope(
payload={"type": "TaskCreated", "task_id": str(task.id)},
correlation_id=correlation_id, # Same ID links request to event
aggregate_id=str(task.id),
aggregate_type="Task",
)
Thread correlation_id through related messages.
from psp.platform.contracts import EventEnvelope
event = EventEnvelope(
payload={
"type": "TaskCompleted",
"task_id": str(task.id),
"completed_at": now.isoformat(),
},
aggregate_id=str(task.id),
aggregate_type="Task",
sequence=task.version, # For optimistic concurrency
correlation_id=correlation_id,
actor_id=str(actor_id),
)
event_bus.publish(event)
Add aggregate context to domain events.
from psp.platform.contracts import QueryEnvelope
query = QueryEnvelope(
payload={
"type": "GetTasksByOwner",
"owner_id": str(user_id),
"status": "pending",
},
correlation_id=correlation_id,
actor_id=str(current_user.id),
)
# Query handler can log/trace using envelope metadata
result = query_handler.execute(query)
Wrap query parameters with metadata.
# V1 payload structure
cmd_v1 = CommandEnvelope(
payload={"type": "CreateTask", "title": "Test"},
schema_version="1.0",
)
# V2 adds new fields
cmd_v2 = CommandEnvelope(
payload={"type": "CreateTask", "title": "Test", "priority": "high"},
schema_version="2.0",
)
# Handler can check version
def handle(cmd: CommandEnvelope) -> None:
if cmd.schema_version == "1.0":
# Apply defaults for missing fields
priority = "normal"
else:
priority = cmd.payload.get("priority", "normal")
Handle multiple payload versions.
API Reference
This component mounts routes under /v1/envelope.
View OpenAPI specification