Middleware Pipeline

Every send(), stream(), and publish() call passes through the configured middleware stack in Russian Doll order. Each middleware invokes next() to continue down the stack, or short-circuits by throwing/returning early. Middleware is registered once and applies to all handler types uniformly.

Table of contents

  1. Middleware Pipeline
    1. Pipeline Structure
    2. Behaviour Reference
    3. Circuit Breaker States
    4. Retry Back-off
    5. TypeScript Example
    6. Python Example
    7. Custom Middleware

Pipeline Structure

flowchart LR
    Req([Request]) --> L["1 Logging"]
    L --> V["2 Validation"]
    V --> A["3 Authorization"]
    A --> Me["4 Metrics"]
    Me --> RL["5 Rate Limit"]
    RL --> Bu["6 Bulkhead"]
    Bu --> Ca["7 Caching"]
    Ca --> Re["8 Retry"]
    Re --> Ti["9 Timeout"]
    Ti --> CB["10 Circuit Breaker"]
    CB --> Tx["11 Transaction"]
    Tx --> H([Handler])
    style Req fill:#4a90d9,color:#fff
    style H fill:#27ae60,color:#fff
    style Ca fill:#fff9c4
    style Re fill:#fce4ec
    style CB fill:#fce4ec

Behaviour Reference

# Middleware Responsibility
1 Logging Emits a structured log entry before and after dispatch, including request type, duration, and outcome.
2 Validation Invokes validate() on the request object before the handler executes. Throws ValidationError on failure.
3 Authorization Calls authorize() on the request. Throws AuthorizationError if the caller lacks permission.
4 Metrics Records request_success_total, request_failure_total, and request_duration_ms counters and histograms.
5 Rate Limit Enforces a token-bucket rate limit per caller key. Throws RateLimitExceededError when the bucket is empty.
6 Bulkhead Caps the number of concurrently in-flight requests. Throws BulkheadRejectedError when the limit is reached.
7 Caching Short-circuits dispatch for repeated read requests by returning a cached response, skipping the handler entirely on a cache hit.
8 Retry Re-invokes the inner handler on TransientError with configurable attempt count and exponential back-off delay.
9 Timeout Cancels the inner handler if it does not complete within a configured deadline. Throws TimeoutError.
10 Circuit Breaker Tracks consecutive handler failures. Opens the circuit after the failure threshold is exceeded, rejecting all requests until a reset interval elapses.
11 Transaction Wraps the handler in a unit-of-work. Commits on success; rolls back on any unhandled exception.

Circuit Breaker States

stateDiagram-v2
    [*] --> Closed
    Closed --> Open : failures >= threshold
    Open --> HalfOpen : reset timeout elapsed
    HalfOpen --> Closed : probe succeeds
    HalfOpen --> Open : probe fails

    Closed : Closed - all requests pass through
    Open : Open - all requests rejected immediately
    HalfOpen : Half-Open - one probe request allowed

Retry Back-off

sequenceDiagram
    participant C as Client
    participant R as RetryMiddleware
    participant H as Handler

    C->>R: send(request)
    R->>H: attempt 1
    H-->>R: TransientError
    Note over R: wait 100 ms
    R->>H: attempt 2
    H-->>R: TransientError
    Note over R: wait 200 ms
    R->>H: attempt 3
    H-->>R: success
    R-->>C: response

TypeScript Example

import {
  LoggingMiddleware, ValidationMiddleware, AuthorizationMiddleware,
  MetricsMiddleware, RateLimitMiddleware, BulkheadMiddleware,
  CachingMiddleware, RetryMiddleware, TimeoutMiddleware,
  CircuitBreakerMiddleware, TransactionMiddleware, InMemoryTransactionManager,
} from '@slck/mediator';

m.registerPipeline(new LoggingMiddleware(logger));
m.registerPipeline(new ValidationMiddleware());
m.registerPipeline(new AuthorizationMiddleware());
m.registerPipeline(new MetricsMiddleware(metrics));
m.registerPipeline(new RateLimitMiddleware({ maxTokens: 100, refillRate: 10 }));
m.registerPipeline(new BulkheadMiddleware({ maxConcurrent: 20 }));
m.registerPipeline(new CachingMiddleware());
m.registerPipeline(new RetryMiddleware({ maxAttempts: 3, delayMs: 100 }));
m.registerPipeline(new TimeoutMiddleware(5_000));
m.registerPipeline(new CircuitBreakerMiddleware({ failureThreshold: 5, resetTimeoutMs: 30_000 }));
m.registerPipeline(new TransactionMiddleware(new InMemoryTransactionManager()));

Python Example

from mediator import (
    LoggingMiddleware, ValidationMiddleware, AuthorizationMiddleware,
    MetricsMiddleware, RateLimitMiddleware, BulkheadMiddleware,
    CachingMiddleware, RetryMiddleware, TimeoutMiddleware,
    CircuitBreakerMiddleware, TransactionMiddleware, InMemoryTransactionManager,
)

m.register_pipeline(LoggingMiddleware(logger))
m.register_pipeline(ValidationMiddleware())
m.register_pipeline(AuthorizationMiddleware())
m.register_pipeline(MetricsMiddleware(metrics))
m.register_pipeline(RateLimitMiddleware(max_tokens=100, refill_rate=10))
m.register_pipeline(BulkheadMiddleware(max_concurrent=20))
m.register_pipeline(CachingMiddleware())
m.register_pipeline(RetryMiddleware(max_attempts=3, delay_ms=100))
m.register_pipeline(TimeoutMiddleware(timeout_ms=5_000))
m.register_pipeline(CircuitBreakerMiddleware(failure_threshold=5, reset_timeout_ms=30_000))
m.register_pipeline(TransactionMiddleware(InMemoryTransactionManager()))

Custom Middleware

Implement the RequestMiddleware interface to add custom cross-cutting behaviour. The same interface applies to all three dispatch modes.

TypeScript:

import type { RequestMiddleware, RequestContext, NextFn } from '@slck/mediator';

class CorrelationIdMiddleware implements RequestMiddleware {
  async execute<T>(request: unknown, ctx: RequestContext, next: NextFn<T>): Promise<T> {
    ctx.items.set('correlationId', crypto.randomUUID());
    return next(request, ctx);
  }
}

m.registerPipeline(new CorrelationIdMiddleware());

Python:

import uuid
from mediator.pipeline import RequestMiddleware
from mediator.contracts import RequestContext
from typing import Any, Callable, Awaitable

class CorrelationIdMiddleware(RequestMiddleware):
    async def execute(
        self,
        request: Any,
        ctx: RequestContext,
        next: Callable[..., Awaitable[Any]],
    ) -> Any:
        ctx.items["correlation_id"] = str(uuid.uuid4())
        return await next(request, ctx)

m.register_pipeline(CorrelationIdMiddleware())