Middleware Pipeline
Every send(), stream(), and publish() call passes through the configured middleware stack in Russian Doll order. Each middleware invokes next() to continue down the stack, or short-circuits by throwing/returning early. Middleware is registered once and applies to all handler types uniformly.
Table of contents
Pipeline Structure
flowchart LR
Req([Request]) --> L["1 Logging"]
L --> V["2 Validation"]
V --> A["3 Authorization"]
A --> Me["4 Metrics"]
Me --> RL["5 Rate Limit"]
RL --> Bu["6 Bulkhead"]
Bu --> Ca["7 Caching"]
Ca --> Re["8 Retry"]
Re --> Ti["9 Timeout"]
Ti --> CB["10 Circuit Breaker"]
CB --> Tx["11 Transaction"]
Tx --> H([Handler])
style Req fill:#4a90d9,color:#fff
style H fill:#27ae60,color:#fff
style Ca fill:#fff9c4
style Re fill:#fce4ec
style CB fill:#fce4ec
Behaviour Reference
| # | Middleware | Responsibility |
|---|---|---|
| 1 | Logging | Emits a structured log entry before and after dispatch, including request type, duration, and outcome. |
| 2 | Validation | Invokes validate() on the request object before the handler executes. Throws ValidationError on failure. |
| 3 | Authorization | Calls authorize() on the request. Throws AuthorizationError if the caller lacks permission. |
| 4 | Metrics | Records request_success_total, request_failure_total, and request_duration_ms counters and histograms. |
| 5 | Rate Limit | Enforces a token-bucket rate limit per caller key. Throws RateLimitExceededError when the bucket is empty. |
| 6 | Bulkhead | Caps the number of concurrently in-flight requests. Throws BulkheadRejectedError when the limit is reached. |
| 7 | Caching | Short-circuits dispatch for repeated read requests by returning a cached response, skipping the handler entirely on a cache hit. |
| 8 | Retry | Re-invokes the inner handler on TransientError with configurable attempt count and exponential back-off delay. |
| 9 | Timeout | Cancels the inner handler if it does not complete within a configured deadline. Throws TimeoutError. |
| 10 | Circuit Breaker | Tracks consecutive handler failures. Opens the circuit after the failure threshold is exceeded, rejecting all requests until a reset interval elapses. |
| 11 | Transaction | Wraps the handler in a unit-of-work. Commits on success; rolls back on any unhandled exception. |
Circuit Breaker States
stateDiagram-v2
[*] --> Closed
Closed --> Open : failures >= threshold
Open --> HalfOpen : reset timeout elapsed
HalfOpen --> Closed : probe succeeds
HalfOpen --> Open : probe fails
Closed : Closed - all requests pass through
Open : Open - all requests rejected immediately
HalfOpen : Half-Open - one probe request allowed
Retry Back-off
sequenceDiagram
participant C as Client
participant R as RetryMiddleware
participant H as Handler
C->>R: send(request)
R->>H: attempt 1
H-->>R: TransientError
Note over R: wait 100 ms
R->>H: attempt 2
H-->>R: TransientError
Note over R: wait 200 ms
R->>H: attempt 3
H-->>R: success
R-->>C: response
TypeScript Example
import {
LoggingMiddleware, ValidationMiddleware, AuthorizationMiddleware,
MetricsMiddleware, RateLimitMiddleware, BulkheadMiddleware,
CachingMiddleware, RetryMiddleware, TimeoutMiddleware,
CircuitBreakerMiddleware, TransactionMiddleware, InMemoryTransactionManager,
} from '@slck/mediator';
m.registerPipeline(new LoggingMiddleware(logger));
m.registerPipeline(new ValidationMiddleware());
m.registerPipeline(new AuthorizationMiddleware());
m.registerPipeline(new MetricsMiddleware(metrics));
m.registerPipeline(new RateLimitMiddleware({ maxTokens: 100, refillRate: 10 }));
m.registerPipeline(new BulkheadMiddleware({ maxConcurrent: 20 }));
m.registerPipeline(new CachingMiddleware());
m.registerPipeline(new RetryMiddleware({ maxAttempts: 3, delayMs: 100 }));
m.registerPipeline(new TimeoutMiddleware(5_000));
m.registerPipeline(new CircuitBreakerMiddleware({ failureThreshold: 5, resetTimeoutMs: 30_000 }));
m.registerPipeline(new TransactionMiddleware(new InMemoryTransactionManager()));
Python Example
from mediator import (
LoggingMiddleware, ValidationMiddleware, AuthorizationMiddleware,
MetricsMiddleware, RateLimitMiddleware, BulkheadMiddleware,
CachingMiddleware, RetryMiddleware, TimeoutMiddleware,
CircuitBreakerMiddleware, TransactionMiddleware, InMemoryTransactionManager,
)
m.register_pipeline(LoggingMiddleware(logger))
m.register_pipeline(ValidationMiddleware())
m.register_pipeline(AuthorizationMiddleware())
m.register_pipeline(MetricsMiddleware(metrics))
m.register_pipeline(RateLimitMiddleware(max_tokens=100, refill_rate=10))
m.register_pipeline(BulkheadMiddleware(max_concurrent=20))
m.register_pipeline(CachingMiddleware())
m.register_pipeline(RetryMiddleware(max_attempts=3, delay_ms=100))
m.register_pipeline(TimeoutMiddleware(timeout_ms=5_000))
m.register_pipeline(CircuitBreakerMiddleware(failure_threshold=5, reset_timeout_ms=30_000))
m.register_pipeline(TransactionMiddleware(InMemoryTransactionManager()))
Custom Middleware
Implement the RequestMiddleware interface to add custom cross-cutting behaviour. The same interface applies to all three dispatch modes.
TypeScript:
import type { RequestMiddleware, RequestContext, NextFn } from '@slck/mediator';
class CorrelationIdMiddleware implements RequestMiddleware {
async execute<T>(request: unknown, ctx: RequestContext, next: NextFn<T>): Promise<T> {
ctx.items.set('correlationId', crypto.randomUUID());
return next(request, ctx);
}
}
m.registerPipeline(new CorrelationIdMiddleware());
Python:
import uuid
from mediator.pipeline import RequestMiddleware
from mediator.contracts import RequestContext
from typing import Any, Callable, Awaitable
class CorrelationIdMiddleware(RequestMiddleware):
async def execute(
self,
request: Any,
ctx: RequestContext,
next: Callable[..., Awaitable[Any]],
) -> Any:
ctx.items["correlation_id"] = str(uuid.uuid4())
return await next(request, ctx)
m.register_pipeline(CorrelationIdMiddleware())