Skip to content

Plan dossier — CVN-N014-ED-S02 · s43 : routage du transport cross-pod (numpy)

Story : CVN-N014-ED-S02 (wp#243, GH #1106) · Epic : CVN-N014-ED — Inter-task data transport · Dépend de : CVN-N014-ED-S01 (flip + audit, Closed) · Science s43 : CVN-N001-EI-S05 (wp#228, hors-scope ici). Statut : plan — pour plan_review. Standard documentaire : ADR-0101.


Partie I — Cadrage (lecteur)

0. Problématisation (sans jargon)

Un diagnostic du projet (« s43 ») tourne sur plusieurs machines en parallèle : 5 « pods » calculent chacun une part des prédictions, et un sixième pod (le « gate ») doit les rassembler toutes pour rendre un verdict.

Le transport de ces prédictions d'un pod à l'autre ne fonctionnait pas. Chaque pod de calcul écrivait son résultat dans un dossier temporaire local à sa propre machine (/tmp), en croyant le partager. Le pod gate lisait son /tmpvide. Résultat : cohorte toujours incomplète → le diagnostic rendait INCONCLUSIVE_TOOLING quoi qu'il arrive, sans que le vrai problème soit visible. Aggravant : les tests passaient (ils utilisaient un vrai dossier local, correct en test mono-machine mais pas entre machines), et la revue n'a rien vu car « /tmp n'est pas partagé » est un fait d'infrastructure invisible dans le code.

Ce que cette Story doit faire : faire en sorte que les prédictions de s43 voyagent réellement d'un pod à l'autre, en écrivant dans le stockage S3 partagé du projet (pas un /tmp local), et le prouver en conditions réelles (run multi-pods, pas un test mono-machine). Elle ne touche pas à la science de s43 (ça reste en CVN-N001-EI-S05) — uniquement la plomberie de transport.

0bis. Les constats sur lesquels on construit (à vérifier, pas à supposer)

Cette Story repose sur des prémisses établies par S01 — on les rend explicites pour que le comité les challenge :

  1. Le flip object-storage XCom est LIVE (S01 Closed, 2026-06-06) : les XCom JSON-sérialisables offloadent désormais vers S3 au-dessus du seuil. ✅ vérifié in-vivo.
  2. Le backend ne sérialise PAS numpy (vérifié in-pod, common-io 1.4.2) : serialize_value fait json.dumps(value, cls=XComEncoder) avant le test de seuil ; XComEncoder lève TypeError sur un numpy.ndarray (testé : ndarray nu + la forme dict-d'arrays de s43) → l'échec précède l'offload, à toute taille. Conséquence dure : un ndarray ne peut jamais transiter direct par XCom, flip ou pas.
  3. L'audit de sérialisation S01 est petit : 129 sites, 0 numpy_suspect, parc JSON-only. Le seul payload tableau du parc est s43, et il écrit déjà hors-XCom vers un store (principe pass-by-référence respecté).
  4. s43 respectait déjà le pass-by-référence (XCom = clés, pas le tableau) : le défaut n'était PAS « numpy dans XCom », c'était la cible du store (/tmp pod-local au lieu de S3) et le fait que le transport était implémenté dans le code de tâche plutôt que dans une couche de transport.
  5. cvntrade_s3_manager existe (src/commun/s3/cvntrade_s3_manager.py) — le helper S3 partagé du projet ; S3 est déjà le substrat (MLFLOW_ARTIFACT_ROOT, S3_BUCKET_NAME, bucket cvntrade-artifacts).

Si l'une de ces 5 prémisses est fausse, le routage ci-dessous doit être re-tranché. (1)-(2)-(3) sont vérifiées in-pod en S01 ; (4)-(5) sont des faits de code.

1. User stories

  • En tant qu'opérateur du diagnostic s43, je veux que les prédictions des 5 pods de calcul arrivent réellement au pod gate, pour que le verdict reflète la cohorte complète et non INCONCLUSIVE_TOOLING.
  • En tant que développeur d'un futur DAG multi-pods, je veux un modèle de transport cross-pod établi et prouvé (écrire au store partagé, passer la référence en XCom), pour ne pas re-tomber dans le piège /tmp-pod-local.
  • En tant que relecteur, je veux que le test d'acceptation soit un smoke cross-pod réel (pods distincts, pas un mono-pod), pour qu'un test vert ne masque plus un chemin porteur cassé entre pods.

2. Hypotheses (testable, EN)

  • H1 — Root cause is the store target, not XCom. s43's cross-pod failure is caused by writing arrays to a pod-local /tmp path; redirecting the same persist/load to the shared S3 store (via cvntrade_s3_manager) makes the cohort complete cross-pod. Test: a real multi-pod smoke where 5 acquisition pods persist + a separate gate pod loads → cohort count == 5 (today: gate sees 0).
  • H2 — numpy never needs to touch XCom. Carrying the S3 key/URI (a JSON string) in XCom + loading the array from S3 downstream is sufficient; the array itself never hits serialize_value. Test: the URI round-trips through the live backend (JSON), the downstream load() returns the exact array (np.array_equal).
  • H3 — Route X (direct return of the array on the backend) is NOT viable for s43. Because of premise §0bis-2 (numpy → TypeError before offload), a bare return ndarray crashes. Test: in-pod XCom.serialize_value(ndarray) raises TypeError (already observed S01).
  • H4 — tmp_path / single-pod tests cannot catch this class. A local-FS round-trip test (or a chain collapsed into one pod) is green while the cross-pod path diverges. Test: the existing s43 tmp_path test stays green even on the broken /tmp target → proves the test is blind; the acceptance gate MUST be the genuinely multi-pod smoke (§ acceptance invariant).

3. State of the art (EN)

  • Airflow XCom + object-storage backend (S01): JSON-serializable values inline (≤ threshold) or offload to S3 (> threshold) transparently; the metadata DB holds the value or the path string. numpy/pandas are out of scope of XComEncoder (premise §0bis-2).
  • Two strategies for non-JSON payloads (Airflow-idiomatic): (a) a registered serializer (airflow.serialization serde) so return ndarray works transparently — drop-in but "magic", couples every consumer to the serde, and still round-trips through the metadata path; (b) explicit pass-by-reference — the task writes the blob to a store and returns a key/URI; XCom carries only the reference. This is the documented Airflow guidance for large/binary data and is what s43 already half-did.
  • Project substrate: cvntrade_s3_manager (src/commun/s3/) wraps the Scaleway S3 bucket cvntrade-artifacts (path-style, aws_default conn); MLflow + L2-events already live there. Pods are stateless → object-store is the coherent transport (vs a shared-RWX POSIX FS, which is off-default on the cluster — SBS is RWO-only — and brings concurrency/SPOF/GC cost for no POSIX need here).
  • The anti-pattern to eliminate (Epic §): per-task bespoke "intermediate storage" — each task choosing where/how to serialize. That's what allowed s43's wrong target, invisible to test + review.

4. Definition of Done

  1. Routage tranché sur evidence (X vs Y), justifié par les prémisses §0bis — voir §6.
  2. s43 runnable cross-pod : le smoke d'acceptation cross-pod réel (cf. invariant ci-dessous) est vert — cohorte complète (count == attendu), le transport fonctionne réellement entre pods distincts.
  3. numpy hors-XCom : s43 écrit ses tableaux vers s3://cvntrade-artifacts/s43-predictions/... via cvntrade_s3_manager ; XCom ne porte que la clé/URI (JSON) — le manifest de cohorte (§7).
  4. Test d'acceptation = le smoke cross-pod (jamais un tmp_path ni un mono-pod — H4). Les tests tmp_path existants restent (round-trip de sérialisation) mais ne sont pas le gate.
  5. Contrat de schéma .npz défini + validé fail-loud au load (§7).
  6. Lifecycle S3 — gateable : S02 ne peut passer Closed que si l'un des deux est vrai :
  7. (a) lifecycle-rule active sur s3://cvntrade-artifacts/s43-predictions/ avec preuve d'application opérateur (sortie get-bucket-lifecycle-configuration) ; ou
  8. (b) waiver opérateur explicite — daté, avec owner et échéance de mise en conformité (sinon le préfixe accumule indéfiniment).
  9. Dette A(ii) inscrite à S03 : la généralisation « bespoke persist/load → couche-policy réutilisable » est tracée sur CVN-N014-ED-S03 (wp#244).
  10. Pas de régression du parc : aucun autre DAG touché (gated, s43-only).

Smoke d'acceptation — invariant non négociable

Le gate d'acceptation de S02 est un smoke cross-pod réel : - 5 tâches d'acquisition s'exécutent dans des pods Kubernetes distincts (prouvé dans les logs : pod_name/hostname distincts par tâche) ; - chaque pod écrit un artefact .npz disjoint sous s3://cvntrade-artifacts/s43-predictions/<dag_run_id>/... ; - le pod gate, exécuté dans un autre pod, récupère les clés via XCom (le manifest, §7) ; - le gate charge les artefacts depuis S3 ; - résultat attendu : cohort_count == attendu (== 5).

Un test tmp_path, un test mono-process, ou une chaîne capture+analyse dans le même pod ne peut PAS satisfaire la DoD. Ces tests restent utiles comme unitaires, mais ne sont pas le gate de la Story.

5. Consolidation

S02 referme l'incident fondateur de l'Epic : le transport s43 écrivait les prédictions dans un /tmp local au pod producteur, invisible depuis le pod gate. La Story remplace ce chemin porteur par un store S3 partagé, prouve le transport par un smoke cross-pod réel, et maintient le tableau numpy hors-XCom.

La décision retenue est Route Y au niveau du payload : les arrays sont écrits en S3, tandis que XCom ne transporte que les références JSON produites par les tâches amont. S02 valide donc en conditions réelles le modèle ADR-0100 « JSON-or-pass-by-reference » (XCom pour l'orchestration et les manifestes, S3 pour les bytes volumineux) — pas l'offload XCom > seuil (les clés sont petites, inline en DB), qui restait validé séparément en S01.

La Story reste volontairement bornée : elle corrige s43 sans modifier la science (CVN-N001-EI-S05) et sans généraliser encore la couche pass-by-référence — généralisation tracée sur S03.


Partie II — Décision & design

6. Décision de routage — Route Y (pass-by-référence vers S3), tranchée sur evidence

La dichotomie X/Y de l'Epic était conditionnée à l'audit (Story 1). L'evidence est maintenant connue :

Critère de routage (issue #1106) Evidence S01 Implication
Audit petit → flip vite → Route X audit petit (129 sites, 0 numpy) ✅ flip livrable — et livré (S01 Closed)
Route X = s43 return/pull direct sur le backend serde finding : numpy → TypeError (§0bis-2) non viable pour le tableau de s43

→ La prémisse de Route X (« s43 passe direct sur le backend ») est invalidée par le finding serde : un ndarray ne peut pas return direct. On retient donc le mécanisme Route Y — pass-by-référence explicite vers S3 (le fix A(ii)) — qui est : - indépendant du flip (donc non bloqué, et le flip est de toute façon déjà là) ; - le bon fix de la root cause (cible du store : /tmp pod-local → S3 partagé) ; - conforme au serde (le tableau ne touche jamais serialize_value ; seule la clé JSON transite en XCom, désormais portée par le backend object-storage de S01).

Nuance assumée : ce n'est pas « X ou Y » au sens strict — le tableau part en by-ref vers S3 (mécanisme Y), tandis que la clé voyage en XCom sur le flip de S01 (acquis X). Les deux se composent une fois qu'on sépare tableau (→ S3) et référence (→ XCom).

Ce que S02 ne fait PAS (→ S03, wp#244) : généraliser le persist/load de s43 en une couche-policy réutilisable (helper/décorateur « pass-by-ref »). S02 corrige s43 uniquement ; S03 mutualise. Dette A(ii) inscrite (DoD §7).

7. Design

Cible : src/commun/finetune/diagnostic/hamilton/s43_io.py (le persist/load actuel, np.savezPath).

7.1 Manifest de cohorte — XCom nominal, list-prefix diagnostic

La source de vérité de la cohorte attendue est le manifest des clés retournées par les tâches amont via XCom. Chaque pod d'acquisition return sa clé S3 (string JSON-sérialisable). Le pod gate agrège ces clés, vérifie que leur cardinalité == cohorte attendue, puis charge chaque .npz depuis S3.

La liste du préfixe S3 est autorisée comme outil de diagnostic / réconciliation, mais elle ne remplace pas le manifest XCom dans le chemin nominal — sinon le gate peut ramasser des orphelins, des retries, ou des restes d'un run précédent si le keying est imparfait. XCom = manifeste attendu ; S3 = store de bytes ; list-prefix = audit/debug, pas vérité métier par défaut.

7.2 Redirection du store

persist(predictions) écrit vers S3 via cvntrade_s3_manager (np.savezBytesIOput_object), plus de Path//tmp. load(keys) lit les .npz depuis S3 aux clés du manifest.

7.3 Keying — robuste aux retries / map-index

s43-predictions/dag_id=<dag_id>/run_id=<safe_run_id>/task_id=<task_id>/map_index=<map_index>/crypto=<crypto>__fold=<fold>.npz
- Retry : la tâche réécrit (idempotent) son objet et return la clé finale de son try réussi → le manifest XCom ne contient que les clés des tries réussis. Le gate ne lit donc jamais deux tries (il lit le manifest, pas le préfixe). try_number n'est pas dans la clé nominale (overwrite contrôlé) ; il peut être ajouté en métadonnée pour debug. - Disjonction : un pod ↔ une (crypto, fold, map_index) → écritures disjointes, pas de race S3.

7.4 Contrat de schéma .npz

s43_npz_schema_version = 1
arrays:
  y_va : ndarray            # labels validation
  P    : ndarray            # prédictions (ou P_0..P_k si liste)
metadata:
  crypto, fold, producer_task_id, dag_run_id, schema_version
load valide fail-loud (ADR-25) : clé manquante, schema_version inconnu, ou shape/dtype incompatible → raise explicite, jamais un blob opaque silencieux.

7.5 Pourquoi pas le serde airflow.serialization

(a) couple chaque consumer à un serializer enregistré global ; (b) re-route le tableau par le chemin metadata/offload (le tableau redevient un « gros objet » à sérialiser) au lieu de l'isoler explicitement ; (c) « object-storage backend ≠ gros objet arbitraire dans S3 ». Le pass-by-ref est plus explicite, local à s43, conforme à la pratique Airflow. (Le serde reste une option future si un besoin transparent multi-DAG émerge — pas le cas.)

8. Fichiers

Fichier Action
src/commun/finetune/diagnostic/hamilton/s43_io.py persist/loadcvntrade_s3_manager (S3), keying §7.3, manifest §7.1, contrat §7.4, plus de /tmp
infra/scaleway/s3-lifecycle-s43-predictions.json (+ README) rétention bornée sur s43-predictions/ (gateable, DoD §6)
tests/unit/test_s43_io.py round-trip np.savezBytesIO · garde anti-Path//tmp · validation de schéma fail-loud · keying déterministe ; marqueurs ADR-83
smoke cross-pod (DAG diagnostic s43, multi-pod réel) test d'acceptation : 5 pods distincts persist → gate (autre pod) load via manifest XCom → cohort_count == 5 (cf. invariant §4) — PAS un single-pod chain

9. Risques

  • R1 — Le smoke re-masque le bug (H4) : un test tmp_path ou mono-pod passe vert sur un chemin cross-pod cassé. Mitigation : gate = smoke cross-pod réel (pods distincts prouvés dans les logs) ; garde anti-/tmp ; le manifest XCom impose le passage par S3.
  • R2 — Concurrence / retries S3 : Mitigation keying disjoint §7.3 + manifest = clés des tries réussis (le gate ne lit jamais deux tries).
  • R3 — Cluster dry-run requis (leçon S03 : un changement cluster-path n'a pas de gate unit-only) : gate smoke cluster obligatoire avant Closed.
  • R4 — Périmètre : ne pas absorber la science s43 (EI-S05) ni la généralisation policy (S03).
  • R5 — Lifecycle oubliée : Mitigation DoD §6 rend la lifecycle gateable (active+preuve, ou waiver daté avec owner+échéance).

10. Stratégie de tests (ADR-83)

Niveau Quoi Gate
unit round-trip sérialisation · garde anti-/tmp · validation schéma fail-loud · keying PR-blocking
smoke cross-pod (acceptation) 5 pods distincts persist → gate (autre pod) load via manifest XComcohort_count == 5 DoD §2/§4 — le gate de la Story
(existant) les tmp_path restent mais ne sont pas le gate (H4)

Discipline : @pytest.mark.story("CVN-N014-ED-S02").

11. Plan d'implémentation

  1. Rediriger s43_io.persist/loadcvntrade_s3_manager (S3) + manifest XCom (§7.1) + keying (§7.3) + contrat de schéma (§7.4) + garde anti-/tmp.
  2. Unit tests (round-trip · garde · validation schéma · keying) verts.
  3. Lifecycle-rule s43-predictions/ (versionnée ; gateable DoD §6 : active+preuve ou waiver daté).
  4. Smoke cross-pod (DAG s43 multi-pod, pods distincts) → cohort_count == 5 vert = test d'acceptation.
  5. Doc : hub Story + inscrire la dette A(ii) (généralisation policy) sur S03/wp#244.
  6. PR (gated, s43-only) → CodeRabbit + comité pr_review (touche src/commun/finetune/).

Done-criteria (rappel) : routage tranché (Y, justifié) · s43 cross-pod vert (pods distincts) · numpy hors-XCom + manifest XCom · contrat .npz fail-loud · lifecycle gateable · dette A(ii)→S03 · pas de régression.