QuerySpec is a platform contract for list/search operations. Components build queries using a fluent builder API, and repositories translate QuerySpec into storage-specific queries. Supports filter operators (eq, neq, in, lt, gt, contains, exists), multi-field sorting, cursor/offset pagination, and text search.

0
Ports
5
Schemas
3
Hooks
3
Events
01

Ports

Required

No required ports

Optional
  • Clock optional
    Time query execution for performance monitoring
  • EventBus optional
    Publish query execution events
Adapters Provided

No adapters provided

02

Schemas

Defines
  • QuerySpec
    Generic query spec for filtering, sorting, pagination
  • Filter
    Filter clause with field, operator, and value
  • Sort
    Sort clause with field and direction
  • Page
    Pagination parameters for query results
  • QueryResult
    Query result with pagination metadata
Uses

No external schemas used

03

Hooks

  • before_execute
    Before a query is executed against a repository
    outputs: Patch
  • after_execute
    After a query completes (success or failure)
    outputs: SideEffectRequest
  • on_slow_query
    When query execution exceeds threshold
    outputs: SideEffectRequest
04

Events

  • QueryExecuted v1
    After a query is executed
    payload: {resource_type, filters_count, duration_ms, rows}
  • QuerySlowThreshold v1
    When query exceeds slow query threshold
    payload: {resource_type, duration_ms, threshold_ms, query}
  • QueryFailed v1
    When a query fails with an error
    payload: {resource_type, error, query}
05

Examples

Build a query with filters
from psp.platform.query import QuerySpec, FilterOp

# Find pending tasks for a user
query = (
    QuerySpec()
    .with_filter("owner_id", FilterOp.EQ, user_id)
    .with_filter("status", FilterOp.EQ, "pending")
    .with_sort("due_date", direction="ASC")
    .with_page(limit=20)
)

tasks = await repo.query(query)

Use fluent API to construct queries.

Use filter operators
from psp.platform.query import QuerySpec, FilterOp

# Equality and inequality
query.with_filter("status", FilterOp.EQ, "active")
query.with_filter("status", FilterOp.NEQ, "archived")

# Comparison
query.with_filter("amount", FilterOp.GT, 100)
query.with_filter("amount", FilterOp.LTE, 1000)

# Set membership
query.with_filter("status", FilterOp.IN, ["pending", "active"])

# String matching
query.with_filter("title", FilterOp.CONTAINS, "urgent")

# Existence check
query.with_filter("due_date", FilterOp.EXISTS, True)

QuerySpec supports multiple filter operators.

Pagination with cursor or offset
# Offset pagination (simple, for small datasets)
page_1 = QuerySpec().with_page(limit=20, offset=0)
page_2 = QuerySpec().with_page(limit=20, offset=20)

# Cursor pagination (efficient, for large datasets)
first_page = QuerySpec().with_page(limit=20)
results = await repo.query(first_page)

# Use last item's cursor for next page
next_page = QuerySpec().with_page(limit=20, cursor=results.next_cursor)

Two pagination strategies for different use cases.

Full-text search
# Search within filtered results
query = (
    QuerySpec()
    .with_text("quarterly report")
    .with_filter("owner_id", FilterOp.EQ, user_id)
    .with_filter("status", FilterOp.NEQ, "archived")
    .with_sort("_score", direction="DESC")  # Relevance sort
    .with_page(limit=10)
)

results = await repo.query(query)

Combine text search with filters.

Implement query() in a repository
class SQLTaskRepository(TaskRepository):
    async def query(self, spec: QuerySpec) -> QueryResult[Task]:
        stmt = select(TaskModel)

        # Apply filters
        for f in spec.filters:
            column = getattr(TaskModel, f.field)
            if f.op == FilterOp.EQ:
                stmt = stmt.where(column == f.value)
            elif f.op == FilterOp.IN:
                stmt = stmt.where(column.in_(f.value))
            # ... other operators

        # Apply sorting
        for s in spec.sorts:
            column = getattr(TaskModel, s.field)
            stmt = stmt.order_by(column.desc() if s.desc else column)

        # Apply pagination
        stmt = stmt.limit(spec.limit).offset(spec.offset)

        rows = await self._session.execute(stmt)
        return QueryResult(items=[Task.from_orm(r) for r in rows])

Translate QuerySpec to storage-specific queries.

API Reference

This component mounts routes under /v1/query. View OpenAPI specification