Platform primitives are domain-agnostic building blocks. Components inject them as dependencies—they never import component code.

Primitive Layer
%%{init: {"flowchart": {"useMaxWidth": false}}}%% graph TB COMPONENTS["Components
(Todos, Budget)"] PRIMITIVES["Platform Primitives
(Clock, EventBus, Policy)"] IDENTITY["Identity Kernel"] COMPONENTS -->|inject| PRIMITIVES COMPONENTS -->|owner_id| IDENTITY style COMPONENTS fill:#ecfeff,stroke:#0891b2 style PRIMITIVES fill:#fef3c7,stroke:#d97706 style IDENTITY fill:#f3e8ff,stroke:#9333ea

Clock

Time abstraction for testability. Never call datetime.now() directly.

SystemClock

Returns actual system time. Used in production.

FixedClock

Returns a fixed time you control. Used in tests.

use_case.py
class CompleteTask:
    def __init__(self, repo: TaskRepo, clock: Clock):
        self.repo = repo
        self.clock = clock

    def execute(self, task_id: UUID):
        task = self.repo.get(task_id)
        task.completed_at = self.clock.now()  # Not datetime.now()
        self.repo.save(task)

EventBus

Publish-subscribe messaging for async communication. See Events for details.

use_case.py
class CompleteTask:
    def __init__(self, repo: TaskRepo, event_bus: EventBus):
        self.repo = repo
        self.event_bus = event_bus

    def execute(self, task_id: UUID, actor_id: UUID):
        task = self.repo.get(task_id)
        task.complete()
        self.repo.save(task)

        # Async: subscribers notified after commit
        self.event_bus.publish(TaskCompleted(
            task_id=task.id,
            owner_id=task.owner_id
        ))

HookDispatcher

Synchronous interception for veto/patch patterns. See Hooks for details.

use_case.py
class CompleteTask:
    def __init__(self, repo: TaskRepo, hooks: HookDispatcher):
        self.repo = repo
        self.hooks = hooks

    def execute(self, task_id: UUID, actor_id: UUID):
        task = self.repo.get(task_id)

        # Sync: blocks until all hooks respond
        result = self.hooks.dispatch("before_complete_task", {
            "task_id": task.id,
            "owner_id": task.owner_id
        })
        if result.vetoed:
            raise OperationVetoed(result.reason)

        task.complete()
        self.repo.save(task)

Policy

Authorization abstraction. Checks if an actor can perform an action on a resource.

AllowAllPolicy

Permits everything. Used in tests or single-user mode.

OwnerOnlyPolicy

Only allows if actor_id matches resource.owner_id.

use_case.py
class DeleteTask:
    def __init__(self, repo: TaskRepo, policy: Policy):
        self.repo = repo
        self.policy = policy

    def execute(self, task_id: UUID, actor_id: UUID):
        task = self.repo.get(task_id)

        # Raises PermissionDeniedError if not allowed
        self.policy.require(actor_id, "delete", task)

        self.repo.delete(task_id)

Envelope

Wrappers that add metadata to commands, queries, and events:

CommandEnvelope

Wraps a command with actor_id, correlation_id, timestamp.

QueryEnvelope

Wraps a query with actor_id for authorization context.

EventEnvelope

Wraps an event with id, timestamp, actor_id, correlation_id.

envelope.py
@dataclass(frozen=True)
class EventEnvelope[T]:
    id: UUID               # Unique event ID
    timestamp: datetime    # When it happened
    actor_id: UUID         # Who triggered it
    correlation_id: UUID   # Request tracing
    payload: T             # The actual event

ActivityLogRepo

Audit trail for entity changes. Records who did what, when.

use_case.py
class UpdateTask:
    def __init__(self, repo: TaskRepo, activity: ActivityLogRepo, clock: Clock):
        self.repo = repo
        self.activity = activity
        self.clock = clock

    def execute(self, task_id: UUID, updates: dict, actor_id: UUID):
        task = self.repo.get(task_id)
        old_values = {k: getattr(task, k) for k in updates}

        for key, value in updates.items():
            setattr(task, key, value)
        self.repo.save(task)

        self.activity.log(ActivityEntry(
            entity_type="Task",
            entity_id=task_id,
            action="updated",
            actor_id=actor_id,
            timestamp=self.clock.now(),
            changes={k: [old_values[k], v] for k, v in updates.items()}
        ))

IdempotencyStore

Prevents duplicate processing of events or commands.

subscriber.py
class RecurrenceSubscriber:
    def __init__(self, idempotency: IdempotencyStore):
        self.idempotency = idempotency

    def on_task_completed(self, envelope: EventEnvelope):
        key = f"recurrence:{envelope.id}"

        if self.idempotency.exists(key):
            return  # Already processed

        self.create_next_occurrence(envelope.payload)
        self.idempotency.mark(key, ttl=timedelta(days=7))

QuerySpec

Standard query structure for filtering, sorting, pagination. See Data Discovery.

query.py
spec = QuerySpec(
    filters=[
        Filter("owner_id", FilterOp.EQ, user_id),
        Filter("status", FilterOp.NE, "completed"),
        Filter("due_date", FilterOp.LTE, today)
    ],
    sort_by="due_date",
    sort_order="asc",
    limit=50
)

tasks = task_repo.query(spec)
Rules
  • Primitives MUST NOT import component code
  • Components MUST inject primitives, not instantiate them
  • Use Clock, never datetime.now()
  • Use Policy for authorization, not inline checks