Platform Primitives
Shared infrastructure that components compose
Platform primitives are domain-agnostic building blocks. Components inject them as dependencies—they never import component code.
(Todos, Budget)"] PRIMITIVES["Platform Primitives
(Clock, EventBus, Policy)"] IDENTITY["Identity Kernel"] COMPONENTS -->|inject| PRIMITIVES COMPONENTS -->|owner_id| IDENTITY style COMPONENTS fill:#ecfeff,stroke:#0891b2 style PRIMITIVES fill:#fef3c7,stroke:#d97706 style IDENTITY fill:#f3e8ff,stroke:#9333ea
Clock
Time abstraction for testability. Never call datetime.now() directly.
Returns actual system time. Used in production.
Returns a fixed time you control. Used in tests.
class CompleteTask: def __init__(self, repo: TaskRepo, clock: Clock): self.repo = repo self.clock = clock def execute(self, task_id: UUID): task = self.repo.get(task_id) task.completed_at = self.clock.now() # Not datetime.now() self.repo.save(task)
EventBus
Publish-subscribe messaging for async communication. See Events for details.
class CompleteTask: def __init__(self, repo: TaskRepo, event_bus: EventBus): self.repo = repo self.event_bus = event_bus def execute(self, task_id: UUID, actor_id: UUID): task = self.repo.get(task_id) task.complete() self.repo.save(task) # Async: subscribers notified after commit self.event_bus.publish(TaskCompleted( task_id=task.id, owner_id=task.owner_id ))
HookDispatcher
Synchronous interception for veto/patch patterns. See Hooks for details.
class CompleteTask: def __init__(self, repo: TaskRepo, hooks: HookDispatcher): self.repo = repo self.hooks = hooks def execute(self, task_id: UUID, actor_id: UUID): task = self.repo.get(task_id) # Sync: blocks until all hooks respond result = self.hooks.dispatch("before_complete_task", { "task_id": task.id, "owner_id": task.owner_id }) if result.vetoed: raise OperationVetoed(result.reason) task.complete() self.repo.save(task)
Policy
Authorization abstraction. Checks if an actor can perform an action on a resource.
Permits everything. Used in tests or single-user mode.
Only allows if actor_id matches resource.owner_id.
class DeleteTask: def __init__(self, repo: TaskRepo, policy: Policy): self.repo = repo self.policy = policy def execute(self, task_id: UUID, actor_id: UUID): task = self.repo.get(task_id) # Raises PermissionDeniedError if not allowed self.policy.require(actor_id, "delete", task) self.repo.delete(task_id)
Envelope
Wrappers that add metadata to commands, queries, and events:
Wraps a command with actor_id, correlation_id, timestamp.
Wraps a query with actor_id for authorization context.
Wraps an event with id, timestamp, actor_id, correlation_id.
@dataclass(frozen=True) class EventEnvelope[T]: id: UUID # Unique event ID timestamp: datetime # When it happened actor_id: UUID # Who triggered it correlation_id: UUID # Request tracing payload: T # The actual event
ActivityLogRepo
Audit trail for entity changes. Records who did what, when.
class UpdateTask: def __init__(self, repo: TaskRepo, activity: ActivityLogRepo, clock: Clock): self.repo = repo self.activity = activity self.clock = clock def execute(self, task_id: UUID, updates: dict, actor_id: UUID): task = self.repo.get(task_id) old_values = {k: getattr(task, k) for k in updates} for key, value in updates.items(): setattr(task, key, value) self.repo.save(task) self.activity.log(ActivityEntry( entity_type="Task", entity_id=task_id, action="updated", actor_id=actor_id, timestamp=self.clock.now(), changes={k: [old_values[k], v] for k, v in updates.items()} ))
IdempotencyStore
Prevents duplicate processing of events or commands.
class RecurrenceSubscriber: def __init__(self, idempotency: IdempotencyStore): self.idempotency = idempotency def on_task_completed(self, envelope: EventEnvelope): key = f"recurrence:{envelope.id}" if self.idempotency.exists(key): return # Already processed self.create_next_occurrence(envelope.payload) self.idempotency.mark(key, ttl=timedelta(days=7))
QuerySpec
Standard query structure for filtering, sorting, pagination. See Data Discovery.
spec = QuerySpec(
filters=[
Filter("owner_id", FilterOp.EQ, user_id),
Filter("status", FilterOp.NE, "completed"),
Filter("due_date", FilterOp.LTE, today)
],
sort_by="due_date",
sort_order="asc",
limit=50
)
tasks = task_repo.query(spec)
- Primitives MUST NOT import component code
- Components MUST inject primitives, not instantiate them
- Use
Clock, neverdatetime.now() - Use
Policyfor authorization, not inline checks