Skip to content

CVNTrade Sprint 7 - Design de l'Orchestrateur de Pipeline

Vue d'ensemble

L'orchestrateur CVNTrade Sprint 7 gère intelligemment les dépendances et la reconstitution des caches pour optimiser l'entraînement des modèles de trading crypto. Il suit une logique métier rationnelle basée sur les dépendances réelles entre les composants.

Architecture des Entités et Dépendances

Hiérarchie des Entités

Feature Store (données brutes crypto)
Labels (étiquetage stratégique)
Feature Engineering (transformation des features)
Feature Selection (sélection des features par modèle)
HPO Params (hyperparamètres optimisés)
Trained Model (modèle entraîné final)

Types d'Entités Métier

  1. Feature Store : Données crypto brutes (OHLCV, order book, trades)
  2. Critères : crypto_symbol, timeframe, date_range
  3. Indépendant des autres entités

  4. Labels : Étiquetage selon la stratégie de trading

  5. Critères : crypto_symbol, timeframe, strategy, date_range
  6. Dépend de : Feature Store

  7. Feature Engineering : Features transformées spécifiques à la stratégie

  8. Critères : crypto_symbol, timeframe, strategy
  9. Dépend de : Feature Store + Labels

  10. Feature Selection : Features sélectionnées par type de modèle

  11. Critères : crypto_symbol, timeframe, strategy, model_type
  12. Dépend de : Feature Engineering

  13. HPO Params : Hyperparamètres optimisés pour le modèle

  14. Critères : crypto_symbol, timeframe, strategy, model_type
  15. Dépend de : Feature Selection

  16. Trained Model : Modèle final entraîné

  17. Critères : crypto_symbol, timeframe, strategy, model_type
  18. Dépend de : HPO Params + Feature Selection

Logique d'Orchestration Rationnelle

Principe Directeur

"Je regarde si j'ai un feature engineering disponible et valide pour faire mon entraînement, si oui, je fais une feature selection pour le modèle à entraîner, je regarde si j'ai des paramètres hpo (sinon je les reconstitue) et je lance mon entraînement. Si je n'ai pas de feature engineering, je regarde si j'ai un feature store valide et un étiquetage valide, si je n'ai pas de feature store, je le recharge et refais l'étiquetage, si j'ai le feature store mais pas l'étiquetage, je refais juste l'étiquetage"

Algorithme d'Orchestration

def orchestrate_training_pipeline(crypto_symbol, timeframe, strategy, model_type):
    """
    Orchestration intelligente basée sur la logique métier
    """

    # === PHASE 1: Vérification Feature Engineering ===
    feature_engineering = cache_index.find_feature_engineering(
        crypto_symbol, timeframe, strategy
    )

    if feature_engineering and is_valid(feature_engineering):
        # Chemin rapide : FE disponible → Selection → HPO → Training
        return fast_track_from_feature_engineering(
            feature_engineering, model_type
        )

    # === PHASE 2: Vérification des dépendances de base ===
    feature_store = cache_index.find_feature_store(crypto_symbol, timeframe)
    labels = cache_index.find_labels(crypto_symbol, timeframe, strategy)

    if not feature_store or not is_valid(feature_store):
        # Reconstitution complète depuis les données sources
        return full_reconstruction_pipeline(crypto_symbol, timeframe, strategy, model_type)

    if not labels or not is_valid(labels):
        # Reconstitution depuis l'étiquetage
        return labeling_reconstruction_pipeline(
            feature_store, crypto_symbol, timeframe, strategy, model_type
        )

    # === PHASE 3: Feature Engineering manquant ===
    # FS + Labels disponibles → Reconstruction FE uniquement
    return feature_engineering_reconstruction_pipeline(
        feature_store, labels, crypto_symbol, timeframe, strategy, model_type
    )

Stratégies de Reconstitution

1. Fast Track (Feature Engineering disponible)

def fast_track_from_feature_engineering(feature_engineering, model_type):
    """
    Chemin optimisé quand le Feature Engineering est disponible
    """
    steps = []

    # 1. Feature Selection pour le modèle cible
    feature_selection = ensure_feature_selection(feature_engineering, model_type)
    steps.append("feature_selection")

    # 2. HPO Params (reconstitution si nécessaire)
    hpo_params = ensure_hpo_params(feature_selection, model_type)
    if not hpo_params:
        steps.append("hpo_optimization")

    # 3. Training final
    steps.append("model_training")

    return execute_pipeline_steps(steps)

2. Reconstruction Complète

def full_reconstruction_pipeline(crypto_symbol, timeframe, strategy, model_type):
    """
    Reconstitution complète depuis les données sources
    """
    steps = [
        "feature_store_loading",     # Chargement données crypto
        "strategy_labeling",         # Étiquetage stratégique
        "feature_engineering",       # Transformation features
        "feature_selection",         # Sélection par modèle
        "hpo_optimization",          # Optimisation hyperparamètres
        "model_training"             # Entraînement final
    ]

    return execute_pipeline_steps(steps)

3. Reconstruction depuis Étiquetage

def labeling_reconstruction_pipeline(feature_store, crypto_symbol, timeframe, strategy, model_type):
    """
    Reconstitution depuis l'étiquetage (Feature Store disponible)
    """
    steps = [
        "strategy_labeling",         # Nouvel étiquetage
        "feature_engineering",       # FE avec nouvelles labels
        "feature_selection",         # Sélection par modèle
        "hpo_optimization",          # HPO si nécessaire
        "model_training"             # Entraînement final
    ]

    return execute_pipeline_steps(steps, feature_store=feature_store)

4. Reconstruction Feature Engineering

def feature_engineering_reconstruction_pipeline(feature_store, labels, crypto_symbol, timeframe, strategy, model_type):
    """
    Reconstitution depuis Feature Engineering (FS + Labels disponibles)
    """
    steps = [
        "feature_engineering",       # FE avec données existantes
        "feature_selection",         # Sélection par modèle
        "hpo_optimization",          # HPO si nécessaire
        "model_training"             # Entraînement final
    ]

    return execute_pipeline_steps(steps, feature_store=feature_store, labels=labels)

Index Intelligent de Cache

Structure SQLite Multiindex

CREATE TABLE cache_entries (
    -- Identifiants métier (index principaux)
    crypto_symbol TEXT NOT NULL,
    timeframe TEXT NOT NULL,
    strategy TEXT NOT NULL,
    entity_type TEXT NOT NULL,

    -- Identifiants techniques
    run_id TEXT UNIQUE NOT NULL,
    experiment_id TEXT NOT NULL,

    -- Métadonnées de qualité
    model_type TEXT,
    version TEXT DEFAULT 'v1',
    created_date TIMESTAMP,
    data_start_date TIMESTAMP,
    data_end_date TIMESTAMP,

    -- Métriques de validité
    status TEXT DEFAULT 'FINISHED',
    size_mb REAL DEFAULT 0.0,
    quality_score REAL,
    validation_metrics TEXT
);

-- Index pour recherche instantanée
CREATE INDEX idx_business_key
ON cache_entries(crypto_symbol, timeframe, strategy, entity_type);

CREATE INDEX idx_entity_type ON cache_entries(entity_type);
CREATE INDEX idx_date_range ON cache_entries(data_start_date, data_end_date);
CREATE INDEX idx_model_type ON cache_entries(model_type);

Méthodes de Recherche Métier

class CVNTradeCacheIndex:
    def find_feature_store(self, crypto_symbol: str, timeframe: str) -> CacheEntry
    def find_labels(self, crypto_symbol: str, timeframe: str, strategy: str) -> CacheEntry
    def find_feature_engineering(self, crypto_symbol: str, timeframe: str, strategy: str) -> CacheEntry
    def find_feature_selection(self, crypto_symbol: str, timeframe: str, strategy: str, model_type: str) -> CacheEntry
    def find_hpo_params(self, crypto_symbol: str, timeframe: str, strategy: str, model_type: str) -> CacheEntry
    def find_trained_model(self, crypto_symbol: str, timeframe: str, strategy: str, model_type: str) -> CacheEntry

    def check_pipeline_readiness(self, crypto_symbol: str, timeframe: str, strategy: str, model_type: str) -> Dict[str, bool]

Critères de Validité

Validation Temporelle

def is_valid(cache_entry: CacheEntry) -> bool:
    """
    Valide un cache selon les critères métier
    """
    # 1. Statut de base
    if cache_entry.status != "FINISHED":
        return False

    # 2. Qualité minimale
    if cache_entry.quality_score and cache_entry.quality_score < 0.7:
        return False

    # 3. Fraîcheur des données (selon l'entité)
    max_age_days = get_max_age_for_entity(cache_entry.entity_type)
    if is_too_old(cache_entry.created_date, max_age_days):
        return False

    # 4. Validation spécifique par type
    return validate_entity_specific(cache_entry)

def get_max_age_for_entity(entity_type: EntityType) -> int:
    """
    Durée de vie maximum par type d'entité
    """
    return {
        EntityType.FEATURE_STORE: 7,      # 7 jours max
        EntityType.LABELS: 30,            # 30 jours max
        EntityType.FEATURE_ENGINEERING: 15,  # 15 jours max
        EntityType.FEATURE_SELECTION: 30,    # 30 jours max
        EntityType.HPO_PARAMS: 60,           # 60 jours max
        EntityType.TRAINED_MODEL: 90         # 90 jours max
    }[entity_type]

Integration MLflow Manager Central

Principe d'Isolation

class CVNTradeCacheManager:
    def __init__(self, mlflow_manager):
        self.mlflow_manager = mlflow_manager  # Manager central OBLIGATOIRE
        self.cache_index = CVNTradeCacheIndex()

    def get_cached_entity(self, entity_type: EntityType, criteria: Dict[str, Any]) -> CacheResult:
        """
        RECHERCHE EXCLUSIVE via l'index intelligent
        """
        # 1. Recherche via index intelligent
        result = self._search_via_intelligent_index(entity_type, criteria)

        if result:
            return result

        # 2. Si rien trouvé → reconstitution nécessaire
        return CacheResult(
            found=False,
            level=CacheLevel.EXACT,
            entity=None,
            run_id=None,
            metadata=None,
            fallback_reason="Reconstitution cache requise"
        )

Règles d'Or

  1. JAMAIS de court-circuit MLflow : Tous les appels MLflow passent par le manager central
  2. Index intelligent EXCLUSIF : Fini les fallbacks en cascade, recherche directe uniquement
  3. Reconstitution explicite : Si le cache n'existe pas, on le dit clairement et on reconstitue
  4. Isolation des runs : Chaque composant gère son run propre via le manager central

Métriques de Performance

KPIs d'Efficacité

def track_orchestration_metrics():
    """
    Métriques de performance de l'orchestrateur
    """
    return {
        "cache_hit_rate": cache_hits / total_requests,
        "average_pipeline_time": sum(pipeline_times) / len(pipeline_times),
        "full_reconstruction_rate": full_reconstructions / total_pipelines,
        "fast_track_rate": fast_tracks / total_pipelines,
        "entity_freshness_score": calculate_freshness_score()
    }

Optimisations Futures

  1. Cache prédictif : Pré-génération des entités populaires
  2. Parallélisation : Exécution parallèle des étapes indépendantes
  3. Cache distribué : Extension multi-serveur
  4. ML de cache : Prédiction des besoins de reconstitution

Auteur: CVNTrade Sprint 7 Architecture Team Version: 1.0 Date: 2025-10-03 Status: Design Approved - Ready for Implementation