Python Guide

slck-mediator is an asyncio-native mediator kernel for Python 3.11+. All handler functions are async coroutines; streaming handlers are async generators.

Table of contents

  1. Python Guide
    1. Request Lifecycle
    2. Quick Start
      1. 1. Define messages
      2. 2. Define handlers
      3. 3. Wire the mediator
      4. 4. Add middleware
    3. Streaming
    4. Notification Fan-out
    5. Code Generation
    6. Full API Reference

Request Lifecycle

sequenceDiagram
    participant C as Client
    participant M as Mediator
    participant P as Middleware Pipeline
    participant H as Handler

    C->>M: await m.send(CreateOrderCommand)
    M->>P: logging, validation, retry, timeout
    P->>H: create_order_handler(ctx, cmd)
    H-->>P: order_id
    P-->>M: order_id
    M-->>C: order_id

Quick Start

pip install slck-mediator

1. Define messages

Subclass Request[TResponse] for commands and queries. Subclass Notification for domain events dispatched to multiple handlers.

from mediator import Request, Notification

class CreateOrderCommand(Request[str]):
    def __init__(self, user_id: str, amount: float) -> None:
        self.user_id = user_id
        self.amount  = amount

class OrderCreatedEvent(Notification):
    def __init__(self, order_id: str) -> None:
        self.order_id = order_id

2. Define handlers

Handler functions receive a RequestContext as the first argument, providing access to the mediator, logger, metrics, and tracer.

from mediator.contracts import RequestContext

async def create_order_handler(ctx: RequestContext, cmd: CreateOrderCommand) -> str:
    order_id = f"order-{cmd.user_id}"
    ctx.logger.info("order.created", {"order_id": order_id})
    await ctx.mediator.publish(OrderCreatedEvent(order_id))
    return order_id

async def order_audit_handler(ctx: RequestContext, event: OrderCreatedEvent) -> None:
    ctx.logger.info("audit.order_created", {"order_id": event.order_id})

3. Wire the mediator

import asyncio
from mediator import Mediator, ConsoleLogger, InMemoryMetrics, InMemoryTracer

async def main() -> None:
    logger  = ConsoleLogger()
    metrics = InMemoryMetrics()
    tracer  = InMemoryTracer("my-service", logger, metrics)

    m = Mediator(logger=logger, metrics=metrics, tracer=tracer)
    m.register_request_handler(CreateOrderCommand, create_order_handler)
    m.register_notification_handler(OrderCreatedEvent, order_audit_handler)

    order_id = await m.send(CreateOrderCommand("user-1", 99.99))
    print(order_id)

asyncio.run(main())

4. Add middleware

Register middleware in the order you want them applied (outermost first):

from mediator import (
    LoggingMiddleware,
    ValidationMiddleware,
    RetryMiddleware,
    TimeoutMiddleware,
)

m.register_pipeline(LoggingMiddleware(logger))
m.register_pipeline(ValidationMiddleware())
m.register_pipeline(RetryMiddleware(max_attempts=3, delay_ms=100))
m.register_pipeline(TimeoutMiddleware(timeout_ms=5_000))

Streaming

Use StreamRequest[TItem] for requests that produce a sequence of values. The handler is an async generator; the caller consumes an AsyncIterable.

sequenceDiagram
    participant C as Client
    participant M as Mediator
    participant H as StreamHandler

    C->>M: m.stream(CounterStream(10))
    M->>H: counter_handler(ctx, req)
    loop yield items 0..9
        H-->>C: item
    end
from mediator import StreamRequest

class CounterStream(StreamRequest[int]):
    def __init__(self, count: int) -> None:
        self.count = count

async def counter_handler(ctx, req: CounterStream):
    for i in range(req.count):
        yield i

m.register_stream_handler(CounterStream, counter_handler)

async for item in m.stream(CounterStream(10)):
    print(item)

Notification Fan-out

publish() dispatches a notification to all registered handlers. By default handlers run sequentially with isolated failure per handler. Use TaskWhenAllPublisher for parallel execution.

flowchart LR
    P["publish(OrderCreatedEvent)"] --> H1["AuditHandler"]
    P --> H2["EmailHandler"]
    P --> H3["AnalyticsHandler"]
    style P fill:#4a90d9,color:#fff
from mediator.publishers import TaskWhenAllPublisher

m = Mediator(logger=logger, publisher=TaskWhenAllPublisher())
m.register_notification_handler(OrderCreatedEvent, order_audit_handler)
m.register_notification_handler(OrderCreatedEvent, order_email_handler)
m.register_notification_handler(OrderCreatedEvent, order_analytics_handler)

await m.publish(OrderCreatedEvent("order-123"))

Code Generation

The build-time generator scans your handler modules and emits a GeneratedMediator class with direct if/isinstance dispatch.

python tools/generate_mediator.py

Never edit the generated file manually. Re-run the generator whenever you add, rename, or remove a handler or message type.


Full API Reference

See the README for the complete API including outbox, inbox, saga, event bus, and all middleware options.