airflow-xcom-reader — read-only Airflow XCom CLI (CVN-N014-EC-S16)¶
scripts/airflow_xcom_pull.py runs a read-only SELECT on the Airflow
xcom (and dag_run) tables inside a running scheduler pod via kubectl exec,
and prints the decoded return values. It is the XCom-side companion to
loki-query: Loki answers "what status did the run emit",
this answers "what trajectory did the task return".
Why it exists (the GOTCHA)¶
Diagnostic DAGs (s40 / s41 / s42 …) return their per-cell trajectory —
auc_by_seed, deltas_per_point, verdict dicts — via XCom, not via Loki. The
s42_* log events carry status (event=s42_cell_outcome severity=…), not the
AUC-vs-point trajectory the HP-swap step-3 analysis needs. Loki therefore cannot
answer "is the optimum still at point X across folds"; XCom can.
But the Airflow metadata DB sits on a private Scaleway VPC IP (e.g.
172.16.16.4) that is unreachable from a laptop — there was no way to read that
XCom locally. This tool closes the gap without exposing the DB: the scheduler
pod already carries the connection string in its env and can reach the DB, so the
script ships a tiny embedded probe and runs it in-pod. No credentials ever leave
the cluster, and the only SQL issued is SELECT (read-only by construction).
Usage¶
# most recent runs of a DAG (newest-first) — find the run_id you want
python scripts/airflow_xcom_pull.py --dag-id diagnostic__s42 --list-runs
# every mapped cell's return_value for a run (one row per crypto / map_index)
python scripts/airflow_xcom_pull.py --dag-id diagnostic__s42 \
--run-id 'manual__2026-06-03T18:21:00+00:00' --task-id discriminate_cell
# one specific mapped cell, raw JSON (for piping / jq)
python scripts/airflow_xcom_pull.py --dag-id diagnostic__s42 \
--run-id '<run_id>' --task-id discriminate_cell --map-index 0 --json
| flag | default | meaning |
|---|---|---|
--dag-id |
— (required) | DAG id, e.g. diagnostic__s42 |
--run-id |
— | DAG run_id (required unless --list-runs) |
--task-id |
— | restrict to one task (e.g. discriminate_cell) |
--map-index |
— | restrict to one mapped index (a single cell) |
--key |
return_value |
XCom key to read |
--list-runs |
off | list recent runs of --dag-id and exit |
--limit |
20 |
max rows |
--namespace |
cvntrade |
k8s namespace of the Airflow pods |
--selector |
component=scheduler |
pod label selector |
--container |
scheduler |
container to exec into |
--pod |
— | explicit pod name (skips selector lookup) |
--json |
off | raw JSON dump (no pretty summary) |
Values are JSON-decoded when possible; a pickled / non-JSON payload is
surfaced verbatim (the script never guesses or silently drops it). Default output
is a human summary (one block per task / map_index / key); --json gives a
machine-readable dump.
Infrastructure assumptions & guarantees¶
- Read-only by construction (ADR-25): the embedded probe issues a single
SELECT; the only side effect is the transientkubectl execthe script owns. - Fail-fast, no silent fallback (ADR-25): no Running pod, a non-zero
kubectl exec, a missingAIRFLOW__*__SQL_ALCHEMY_CONN, or non-JSON probe output each raise with an explicit message and a non-zero exit — the tool never invents an empty result. An empty match (vs. an error) prints a hint to widen with--list-runs. - Credentials stay in-cluster: the connection string is read from the pod env and never printed; nothing DB-sensitive crosses to the laptop.
- Dependency-light: the in-pod probe uses only
sqlalchemy+json, both already present in the Airflow image. - Cluster access required: assumes a working
kubectlcontext against thecvntradenamespace (the same prerequisite asloki-query's port-forward).
Why an in-pod SELECT and not the Airflow REST API / CLI¶
- REST API (
GET /api/v1/dags/{id}/dagRuns/{run}/taskInstances/{task}/xcomEntries): the webserver is not exposed locally (same private-VPC constraint as the DB), XCom-value return is disabled by default (enable_xcom_pickling/ value redaction), and mapped-task (map_index) entries are awkward to enumerate. It would need an ingress + auth we deliberately don't expose for a read-only diagnostic. airflow tasks states-for-dag-run/airflow xcom: the Airflow CLI has no generic XCom read subcommand (only DAG/task state); it cannot return a mapped cell'sreturn_valuepayload.- In-pod
SELECTsidesteps all of the above with zero new surface: the pod already holds the conn string and DB reachability. If a future Airflow exposes a first-class XCom read API in-cluster, migrating to it is a drop-in replacement for the embedded probe.
Reference: memory project_session_state_20260605 · companion CLI
loki-query · plan dossier
../reviews/2026-06-05-cvn-n014-ec-s16-dev-productivity-tooling-plan.md.