Middleware System¶
Overview¶
Composable middleware pipeline using the onion execution model with before/after/on_error phases. Each middleware can inspect and modify inputs before module execution, transform outputs after execution, and participate in error recovery when failures occur. The pipeline supports both full subclass-based middleware and lightweight function adapters for simple use cases.
Requirements¶
- Provide a base
Middlewareclass with no-op defaults for all three lifecycle phases (before, after, on_error), allowing subclasses to override only the methods they need. - Implement onion-model execution: before hooks run in registration order, after hooks run in reverse registration order, and on_error hooks run in reverse order over only the middlewares that executed before the failure.
- Support input modification in
before()(return a new dict to replace inputs, or None to pass through unchanged) and output modification inafter()(same contract). - Support error recovery:
on_error()handlers are called in reverse order; the first handler to return a non-None dict provides recovery output, short-circuiting the remaining handlers. - Provide
BeforeMiddlewareandAfterMiddlewareadapters that wrap plain callback functions as middleware instances, reducing boilerplate for single-phase hooks. - Include a
LoggingMiddlewarewith structured logging, security-aware redaction of inputs viacontext.redacted_inputs, and per-call duration tracking stored incontext.data. - Wrap before-phase failures in
MiddlewareChainErrorcarrying both the original exception and the list of executed middlewares, enabling targeted error recovery. - Ensure all mutations to the middleware list are thread-safe.
Technical Design¶
Architecture¶
The middleware system follows a classic onion (layered) execution model. The MiddlewareManager holds an ordered list of Middleware instances and provides three execution methods corresponding to the module call lifecycle:
-
execute_before()-- Iterates middlewares in registration order. Each middleware'sbefore()receives the current inputs and may return a replacement dict. If a middleware raises, aMiddlewareChainErroris raised with the list of already-executed middlewares attached. -
execute_after()-- Iterates middlewares in reverse registration order. Each middleware'safter()receives both original inputs and the current output, and may return a replacement output dict. -
execute_on_error()-- Iterates theexecuted_middlewareslist (from the before phase) in reverse order. The first handler to return a non-None dict becomes the recovery output. If a handler itself raises, the exception is logged and iteration continues.
Snapshot Pattern¶
The MiddlewareManager uses a lock-protected snapshot pattern for thread safety. Before each execution pass, snapshot() acquires the lock, copies the middleware list, and releases the lock. The execution then iterates over the snapshot without holding the lock, so concurrent add()/remove() calls do not interfere with in-flight pipelines.
Components¶
Middleware(base class) -- Plain class (not ABC) with three methods returning None by default. Subclasses override only what they need.MiddlewareManager-- Manages the ordered list and orchestrates the three execution phases. Usesthreading.Lockwith the snapshot pattern.BeforeMiddleware/AfterMiddleware-- Lightweight adapters wrapping a single callback function as a fullMiddlewaresubclass. Non-overridden phases remain no-ops.LoggingMiddleware-- Structured logging middleware that records start time incontext.data["_logging_mw_start"]duringbefore(), computes duration inafter(), and usescontext.redacted_inputsto avoid leaking sensitive data. Configurable vialog_inputs,log_outputs, andlog_errorsflags.MiddlewareChainError-- Exception subclass carryingoriginal(the root cause) andexecuted_middlewares(the list of middlewares whosebefore()was called, for targeted error recovery).
Data Flow¶
Inputs --> [MW1.before] --> [MW2.before] --> [MW3.before] --> Module.execute()
|
Output <-- [MW1.after] <-- [MW2.after] <-- [MW3.after] <------+
On Error (if MW3.before fails):
[MW2.on_error] <-- [MW3.on_error]
(MW1.on_error is not called because MW3 is where before failed,
and recovery walks backwards through executed middlewares)
Key Files¶
| File | Lines | Purpose |
|---|---|---|
src/apcore/middleware/__init__.py |
16 | Package re-exports for convenient imports |
src/apcore/middleware/base.py |
36 | Middleware base class with no-op defaults |
src/apcore/middleware/manager.py |
129 | MiddlewareManager and MiddlewareChainError |
src/apcore/middleware/logging.py |
94 | LoggingMiddleware with structured logging and redaction |
src/apcore/middleware/adapters.py |
43 | BeforeMiddleware and AfterMiddleware function adapters |
Dependencies¶
Internal¶
apcore.context.Context-- Execution context passed to all middleware methods, providestrace_id,caller_id,redacted_inputs, anddatadict for per-call state storage.
External¶
threading(stdlib) -- Lock for thread-safe middleware list management.logging(stdlib) -- Standard library logging used byLoggingMiddlewareand manager error reporting.time(stdlib) -- Wall-clock timing for duration measurements inLoggingMiddleware.
Testing Strategy¶
Tests are split across two files targeting different abstraction levels:
Unit Tests (tests/test_middleware.py)¶
- Middleware base class: Verifies it is not an ABC, can be instantiated directly, all methods return None by default, and subclasses can selectively override methods.
- BeforeMiddleware adapter: Confirms it is a
Middlewaresubclass, delegatesbefore()to the callback, and leavesafter()/on_error()as no-ops. Validates correct argument forwarding. - AfterMiddleware adapter: Same structure as
BeforeMiddlewaretests but for theafter()phase.
Manager Tests (tests/test_middleware_manager.py)¶
- add/remove: Verifies append ordering, identity-based removal, and return values.
- execute_before: Tests registration-order execution, input replacement via returned dicts, None passthrough,
MiddlewareChainErroron failure with correctexecuted_middlewarestracking, and empty-list passthrough. - execute_after: Tests reverse-order execution, output replacement, None passthrough, exception propagation, and empty-list passthrough.
- execute_on_error: Tests reverse iteration over executed middlewares, first-dict-wins recovery, None continuation, exception-in-handler logging and continuation, and empty-list returns None.
- Thread safety: Concurrent
add()with no lost middlewares (10 threads x 50 adds), snapshot consistency after mutations, and concurrentadd()+snapshot()with no exceptions (5 writer + 5 reader threads).
Integration Tests (tests/integration/test_middleware_chain.py)¶
- Full pipeline tests exercising middleware through the
Executor.call()path.