IdempotencyStore is a platform port for ensuring exactly-once semantics. Clients send an Idempotency-Key header, and the store caches operation results. Repeated requests with the same key return the cached result without re-executing the operation. Supports TTL for automatic expiration of cached results.

1
Ports
1
Schemas
4
Hooks
4
Events
01

Ports

Required
  • Clock
    Calculate TTL expiration times
Optional
  • EventBus optional
    Publish cache hit/miss events for observability
Adapters Provided
  • InMemoryIdempotencyStore
    implements IdempotencyStore
    In-memory store for testing and development
02

Schemas

Defines
Uses

No external schemas used

03

Hooks

  • before_check
    Before checking the cache for an existing result
    outputs: Patch
  • on_cache_hit
    When a cached result is found (before returning)
    outputs: SideEffectRequest
  • on_cache_miss
    When no cached result exists (before executing)
    outputs: SideEffectRequest
  • before_store
    Before storing a result in the cache
    outputs: Patch
04

Events

  • IdempotencyCacheHit v1
    When a cached result is returned
    payload: {key, cached_at, age_seconds}
  • IdempotencyCacheMiss v1
    When no cached result exists
    payload: {key}
  • IdempotencyResultStored v1
    After a result is stored in the cache
    payload: {key, ttl_seconds, expires_at}
  • IdempotencyKeyExpired v1
    When a cached result expires (TTL)
    payload: {key, cached_at, expired_at}
05

Stories

06

Examples

Use check_idempotency helper
from psp.platform.idempotency import check_idempotency

def create_order(store: IdempotencyStore, key: str | None, data: dict):
    def operation():
        order = Order.create(data)
        repo.add(order)
        return {"order_id": str(order.id)}

    return check_idempotency(store, key, operation)

# First call executes the operation
result1 = create_order(store, "order-abc-123", data)

# Second call returns cached result (no duplicate order)
result2 = create_order(store, "order-abc-123", data)
assert result1 == result2

Wrap operations with automatic idempotency.

Manual get/put pattern
async def process_payment(store: IdempotencyStore, key: str, amount: int):
    # Check for existing result
    existing = store.get(key)
    if existing is not None:
        return existing

    # Execute payment (only runs once per key)
    result = await payment_gateway.charge(amount)

    # Cache result with 24-hour TTL
    store.put(key, result, ttl_seconds=86400)
    return result

Direct store access for complex flows.

Extract key from HTTP header
from fastapi import Header
from psp.platform.idempotency import IDEMPOTENCY_KEY_HEADER

@router.post("/orders")
async def create_order(
    data: OrderCreate,
    idempotency_key: str | None = Header(
        None, alias=IDEMPOTENCY_KEY_HEADER
    ),
):
    return check_idempotency(
        store,
        idempotency_key,
        lambda: order_service.create(data),
    )

Use standard header for idempotency keys.

Test with InMemoryIdempotencyStore
from psp.platform.idempotency import InMemoryIdempotencyStore

def test_idempotent_create():
    store = InMemoryIdempotencyStore()
    call_count = 0

    def operation():
        nonlocal call_count
        call_count += 1
        return {"id": "123"}

    # First call executes
    result1 = check_idempotency(store, "key-1", operation)
    assert call_count == 1

    # Second call returns cached result
    result2 = check_idempotency(store, "key-1", operation)
    assert call_count == 1  # Not incremented
    assert result1 == result2

Use in-memory store for testing.

Implement a Redis-backed store
class RedisIdempotencyStore(IdempotencyStore):
    def __init__(self, redis: Redis) -> None:
        self._redis = redis

    def get(self, key: str) -> Any | None:
        data = self._redis.get(f"idem:{key}")
        return json.loads(data) if data else None

    def put(self, key: str, result: Any, ttl_seconds: int | None = None):
        data = json.dumps(result)
        if ttl_seconds:
            self._redis.setex(f"idem:{key}", ttl_seconds, data)
        else:
            self._redis.set(f"idem:{key}", data)

    def delete(self, key: str) -> bool:
        return self._redis.delete(f"idem:{key}") > 0

Production implementation with Redis.

API Reference

This component mounts routes under /v1/idempotency. View OpenAPI specification