Envelopes wrap message payloads with standard metadata. CommandEnvelope adds idempotency_key for duplicate prevention. QueryEnvelope wraps read operations. EventEnvelope adds aggregate context for domain events. All envelopes include message_id, correlation_id, schema_version, occurred_at, and actor_id.

0
Ports
4
Schemas
3
Hooks
2
Events
01

Ports

Required

No required ports

Optional
  • Clock optional
    Provide occurred_at timestamp for envelopes
  • IdGenerator optional
    Generate unique message_id for envelopes
  • CorrelationContext optional
    Provide correlation_id from current request context
Adapters Provided

No adapters provided

02

Schemas

Defines
Uses

No external schemas used

03

Hooks

  • before_create_command
    Before a CommandEnvelope is created
    outputs: Patch
  • before_create_query
    Before a QueryEnvelope is created
    outputs: Patch
  • after_create
    After any envelope is created
    outputs: SideEffectRequest
04

Events

  • CommandEnvelopeCreated v1
    After a CommandEnvelope is instantiated
    payload: {message_id, idempotency_key, actor_id}
  • QueryEnvelopeCreated v1
    After a QueryEnvelope is instantiated
    payload: {message_id, correlation_id, actor_id}
05

Examples

Wrap a command with CommandEnvelope
from psp.platform.contracts import CommandEnvelope

# Create command envelope with payload
cmd = CommandEnvelope(
    payload={"type": "CreateTask", "title": "Buy groceries"},
    actor_id="user-123",
    correlation_id="req-abc",
    idempotency_key="create-task-xyz",
)

# Access envelope metadata
print(cmd.message_id)       # Auto-generated UUID
print(cmd.occurred_at)      # Auto-generated timestamp
print(cmd.schema_version)   # "1.0"
print(cmd.idempotency_key)  # "create-task-xyz"

Add metadata and idempotency to command payloads.

Propagate correlation across operations
# HTTP handler extracts or generates correlation_id
correlation_id = request.headers.get("X-Correlation-ID") or str(uuid4())

# Wrap incoming command
cmd = CommandEnvelope(
    payload=input_data,
    correlation_id=correlation_id,
    actor_id=current_user.id,
)

# Use same correlation_id for events
event = EventEnvelope(
    payload={"type": "TaskCreated", "task_id": str(task.id)},
    correlation_id=correlation_id,  # Same ID links request to event
    aggregate_id=str(task.id),
    aggregate_type="Task",
)

Thread correlation_id through related messages.

Wrap events with EventEnvelope
from psp.platform.contracts import EventEnvelope

event = EventEnvelope(
    payload={
        "type": "TaskCompleted",
        "task_id": str(task.id),
        "completed_at": now.isoformat(),
    },
    aggregate_id=str(task.id),
    aggregate_type="Task",
    sequence=task.version,  # For optimistic concurrency
    correlation_id=correlation_id,
    actor_id=str(actor_id),
)

event_bus.publish(event)

Add aggregate context to domain events.

Use QueryEnvelope for reads
from psp.platform.contracts import QueryEnvelope

query = QueryEnvelope(
    payload={
        "type": "GetTasksByOwner",
        "owner_id": str(user_id),
        "status": "pending",
    },
    correlation_id=correlation_id,
    actor_id=str(current_user.id),
)

# Query handler can log/trace using envelope metadata
result = query_handler.execute(query)

Wrap query parameters with metadata.

Schema versioning for evolution
# V1 payload structure
cmd_v1 = CommandEnvelope(
    payload={"type": "CreateTask", "title": "Test"},
    schema_version="1.0",
)

# V2 adds new fields
cmd_v2 = CommandEnvelope(
    payload={"type": "CreateTask", "title": "Test", "priority": "high"},
    schema_version="2.0",
)

# Handler can check version
def handle(cmd: CommandEnvelope) -> None:
    if cmd.schema_version == "1.0":
        # Apply defaults for missing fields
        priority = "normal"
    else:
        priority = cmd.payload.get("priority", "normal")

Handle multiple payload versions.

API Reference

This component mounts routes under /v1/envelope. View OpenAPI specification