Python Guide
slck-mediator is an asyncio-native mediator kernel for Python 3.11+. All handler functions are async coroutines; streaming handlers are async generators.
Table of contents
Request Lifecycle
sequenceDiagram
participant C as Client
participant M as Mediator
participant P as Middleware Pipeline
participant H as Handler
C->>M: await m.send(CreateOrderCommand)
M->>P: logging, validation, retry, timeout
P->>H: create_order_handler(ctx, cmd)
H-->>P: order_id
P-->>M: order_id
M-->>C: order_id
Quick Start
pip install slck-mediator
1. Define messages
Subclass Request[TResponse] for commands and queries. Subclass Notification for domain events dispatched to multiple handlers.
from mediator import Request, Notification
class CreateOrderCommand(Request[str]):
def __init__(self, user_id: str, amount: float) -> None:
self.user_id = user_id
self.amount = amount
class OrderCreatedEvent(Notification):
def __init__(self, order_id: str) -> None:
self.order_id = order_id
2. Define handlers
Handler functions receive a RequestContext as the first argument, providing access to the mediator, logger, metrics, and tracer.
from mediator.contracts import RequestContext
async def create_order_handler(ctx: RequestContext, cmd: CreateOrderCommand) -> str:
order_id = f"order-{cmd.user_id}"
ctx.logger.info("order.created", {"order_id": order_id})
await ctx.mediator.publish(OrderCreatedEvent(order_id))
return order_id
async def order_audit_handler(ctx: RequestContext, event: OrderCreatedEvent) -> None:
ctx.logger.info("audit.order_created", {"order_id": event.order_id})
3. Wire the mediator
import asyncio
from mediator import Mediator, ConsoleLogger, InMemoryMetrics, InMemoryTracer
async def main() -> None:
logger = ConsoleLogger()
metrics = InMemoryMetrics()
tracer = InMemoryTracer("my-service", logger, metrics)
m = Mediator(logger=logger, metrics=metrics, tracer=tracer)
m.register_request_handler(CreateOrderCommand, create_order_handler)
m.register_notification_handler(OrderCreatedEvent, order_audit_handler)
order_id = await m.send(CreateOrderCommand("user-1", 99.99))
print(order_id)
asyncio.run(main())
4. Add middleware
Register middleware in the order you want them applied (outermost first):
from mediator import (
LoggingMiddleware,
ValidationMiddleware,
RetryMiddleware,
TimeoutMiddleware,
)
m.register_pipeline(LoggingMiddleware(logger))
m.register_pipeline(ValidationMiddleware())
m.register_pipeline(RetryMiddleware(max_attempts=3, delay_ms=100))
m.register_pipeline(TimeoutMiddleware(timeout_ms=5_000))
Streaming
Use StreamRequest[TItem] for requests that produce a sequence of values. The handler is an async generator; the caller consumes an AsyncIterable.
sequenceDiagram
participant C as Client
participant M as Mediator
participant H as StreamHandler
C->>M: m.stream(CounterStream(10))
M->>H: counter_handler(ctx, req)
loop yield items 0..9
H-->>C: item
end
from mediator import StreamRequest
class CounterStream(StreamRequest[int]):
def __init__(self, count: int) -> None:
self.count = count
async def counter_handler(ctx, req: CounterStream):
for i in range(req.count):
yield i
m.register_stream_handler(CounterStream, counter_handler)
async for item in m.stream(CounterStream(10)):
print(item)
Notification Fan-out
publish() dispatches a notification to all registered handlers. By default handlers run sequentially with isolated failure per handler. Use TaskWhenAllPublisher for parallel execution.
flowchart LR
P["publish(OrderCreatedEvent)"] --> H1["AuditHandler"]
P --> H2["EmailHandler"]
P --> H3["AnalyticsHandler"]
style P fill:#4a90d9,color:#fff
from mediator.publishers import TaskWhenAllPublisher
m = Mediator(logger=logger, publisher=TaskWhenAllPublisher())
m.register_notification_handler(OrderCreatedEvent, order_audit_handler)
m.register_notification_handler(OrderCreatedEvent, order_email_handler)
m.register_notification_handler(OrderCreatedEvent, order_analytics_handler)
await m.publish(OrderCreatedEvent("order-123"))
Code Generation
The build-time generator scans your handler modules and emits a GeneratedMediator class with direct if/isinstance dispatch.
python tools/generate_mediator.py
Never edit the generated file manually. Re-run the generator whenever you add, rename, or remove a handler or message type.
Full API Reference
See the README for the complete API including outbox, inbox, saga, event bus, and all middleware options.