Skip to content

0060 hot path sequential pipeline candlepipeline pattern

ADR-60 — Hot-Path Sequential Pipeline (CandlePipeline Pattern)

Status: Decided (2026-04-17)

Context: The backtest engine's candle-by-candle loop (2700-line monolith _run_candle_loop_ldp) mixed CUSUM gating, enrichment, feature engineering, inference, filter chain, and position management in a single function. Adding a step required editing the monolith. A/B testing a step meant duplicating the entire loop. Paper trading (cvntrade_paper_trading_engine.py) reimplemented the same flow separately, violating ADR-40 (same kernel for BT/paper/live). Issue #568 diagnosed the need for a composable pattern, but without constraining the pattern to its proper domain, it would creep into batch pipelines where it doesn't belong.

Decision: The hot path — any per-candle, per-tick, or per-event loop with sub-second latency requirements — MUST use the CandlePipeline pattern (src/commun/pipeline/runner/). Steps are composed sequentially with short-circuit on SkipReason. State carries across candles via PipelineState. Funnel entries, step timings, and diagnostics are produced uniformly. The same pipeline instance is reused across backtest, paper trading, and live (ADR-40).

Invariants:

  • Scope — hot path only: Use CandlePipeline for per-event sequential flows. Do NOT use it for batch DAGs (those use DataflowDAG, ADR-61) or workflow orchestration (Airflow).
  • Step contract: Each step implements process(ctx: CandleContext) -> Union[CandleContext, SkipReason]. No side effects outside ctx.state and the step's own dependencies (injected APIs).
  • Short-circuit: A step returning SkipReason halts the pipeline for the current candle. Funnel records REJECTED. Subsequent steps are not executed.
  • No duplicate loops: Any new candle-by-candle loop MUST use CandlePipeline. Writing a new monolith loop is a PR rejection.
  • Paper/live parity: Paper trading and live trading MUST use the same CandlePipeline instance as backtest. Only the candle source (WebSocket vs DataFrame) and execution adapter (real vs simulated) differ (ADR-40).
  • FTF-toggleable: Steps declare ftf_env_var when applicable. The builder (build_candle_pipeline) includes/excludes steps based on env vars (ADR-56).
  • Observability: Every step execution records a FunnelEntry (PASSED/REJECTED/SKIPPED) and a timing in ctx.step_timings. These are emitted as structured logs via log_event (ADR-32).

Alternatives rejected: - Inline monolith (current state): violates ADR-40, prevents A/B testing without code duplication. - External orchestrator (Airflow/Temporal/Prefect) for per-candle: adds milliseconds of latency, incompatible with 10K candles/sec backtest throughput. - Chain-of-Responsibility library (py-chain, pipeline.py PyPI): adds a dependency for ~50 lines of core logic we already own.

Files: src/commun/pipeline/runner/pipeline.py, src/commun/pipeline/runner/context.py, src/commun/pipeline/runner/step.py, src/commun/pipeline/runner/steps.py, src/commun/pipeline/runner/builder.py