CVNTrade Sprint 7 - Design de l'Orchestrateur de Pipeline¶
Vue d'ensemble¶
L'orchestrateur CVNTrade Sprint 7 gère intelligemment les dépendances et la reconstitution des caches pour optimiser l'entraînement des modèles de trading crypto. Il suit une logique métier rationnelle basée sur les dépendances réelles entre les composants.
Architecture des Entités et Dépendances¶
Hiérarchie des Entités¶
Feature Store (données brutes crypto)
↓
Labels (étiquetage stratégique)
↓
Feature Engineering (transformation des features)
↓
Feature Selection (sélection des features par modèle)
↓
HPO Params (hyperparamètres optimisés)
↓
Trained Model (modèle entraîné final)
Types d'Entités Métier¶
- Feature Store : Données crypto brutes (OHLCV, order book, trades)
- Critères :
crypto_symbol,timeframe,date_range -
Indépendant des autres entités
-
Labels : Étiquetage selon la stratégie de trading
- Critères :
crypto_symbol,timeframe,strategy,date_range -
Dépend de : Feature Store
-
Feature Engineering : Features transformées spécifiques à la stratégie
- Critères :
crypto_symbol,timeframe,strategy -
Dépend de : Feature Store + Labels
-
Feature Selection : Features sélectionnées par type de modèle
- Critères :
crypto_symbol,timeframe,strategy,model_type -
Dépend de : Feature Engineering
-
HPO Params : Hyperparamètres optimisés pour le modèle
- Critères :
crypto_symbol,timeframe,strategy,model_type -
Dépend de : Feature Selection
-
Trained Model : Modèle final entraîné
- Critères :
crypto_symbol,timeframe,strategy,model_type - Dépend de : HPO Params + Feature Selection
Logique d'Orchestration Rationnelle¶
Principe Directeur¶
"Je regarde si j'ai un feature engineering disponible et valide pour faire mon entraînement, si oui, je fais une feature selection pour le modèle à entraîner, je regarde si j'ai des paramètres hpo (sinon je les reconstitue) et je lance mon entraînement. Si je n'ai pas de feature engineering, je regarde si j'ai un feature store valide et un étiquetage valide, si je n'ai pas de feature store, je le recharge et refais l'étiquetage, si j'ai le feature store mais pas l'étiquetage, je refais juste l'étiquetage"
Algorithme d'Orchestration¶
def orchestrate_training_pipeline(crypto_symbol, timeframe, strategy, model_type):
"""
Orchestration intelligente basée sur la logique métier
"""
# === PHASE 1: Vérification Feature Engineering ===
feature_engineering = cache_index.find_feature_engineering(
crypto_symbol, timeframe, strategy
)
if feature_engineering and is_valid(feature_engineering):
# Chemin rapide : FE disponible → Selection → HPO → Training
return fast_track_from_feature_engineering(
feature_engineering, model_type
)
# === PHASE 2: Vérification des dépendances de base ===
feature_store = cache_index.find_feature_store(crypto_symbol, timeframe)
labels = cache_index.find_labels(crypto_symbol, timeframe, strategy)
if not feature_store or not is_valid(feature_store):
# Reconstitution complète depuis les données sources
return full_reconstruction_pipeline(crypto_symbol, timeframe, strategy, model_type)
if not labels or not is_valid(labels):
# Reconstitution depuis l'étiquetage
return labeling_reconstruction_pipeline(
feature_store, crypto_symbol, timeframe, strategy, model_type
)
# === PHASE 3: Feature Engineering manquant ===
# FS + Labels disponibles → Reconstruction FE uniquement
return feature_engineering_reconstruction_pipeline(
feature_store, labels, crypto_symbol, timeframe, strategy, model_type
)
Stratégies de Reconstitution¶
1. Fast Track (Feature Engineering disponible)¶
def fast_track_from_feature_engineering(feature_engineering, model_type):
"""
Chemin optimisé quand le Feature Engineering est disponible
"""
steps = []
# 1. Feature Selection pour le modèle cible
feature_selection = ensure_feature_selection(feature_engineering, model_type)
steps.append("feature_selection")
# 2. HPO Params (reconstitution si nécessaire)
hpo_params = ensure_hpo_params(feature_selection, model_type)
if not hpo_params:
steps.append("hpo_optimization")
# 3. Training final
steps.append("model_training")
return execute_pipeline_steps(steps)
2. Reconstruction Complète¶
def full_reconstruction_pipeline(crypto_symbol, timeframe, strategy, model_type):
"""
Reconstitution complète depuis les données sources
"""
steps = [
"feature_store_loading", # Chargement données crypto
"strategy_labeling", # Étiquetage stratégique
"feature_engineering", # Transformation features
"feature_selection", # Sélection par modèle
"hpo_optimization", # Optimisation hyperparamètres
"model_training" # Entraînement final
]
return execute_pipeline_steps(steps)
3. Reconstruction depuis Étiquetage¶
def labeling_reconstruction_pipeline(feature_store, crypto_symbol, timeframe, strategy, model_type):
"""
Reconstitution depuis l'étiquetage (Feature Store disponible)
"""
steps = [
"strategy_labeling", # Nouvel étiquetage
"feature_engineering", # FE avec nouvelles labels
"feature_selection", # Sélection par modèle
"hpo_optimization", # HPO si nécessaire
"model_training" # Entraînement final
]
return execute_pipeline_steps(steps, feature_store=feature_store)
4. Reconstruction Feature Engineering¶
def feature_engineering_reconstruction_pipeline(feature_store, labels, crypto_symbol, timeframe, strategy, model_type):
"""
Reconstitution depuis Feature Engineering (FS + Labels disponibles)
"""
steps = [
"feature_engineering", # FE avec données existantes
"feature_selection", # Sélection par modèle
"hpo_optimization", # HPO si nécessaire
"model_training" # Entraînement final
]
return execute_pipeline_steps(steps, feature_store=feature_store, labels=labels)
Index Intelligent de Cache¶
Structure SQLite Multiindex¶
CREATE TABLE cache_entries (
-- Identifiants métier (index principaux)
crypto_symbol TEXT NOT NULL,
timeframe TEXT NOT NULL,
strategy TEXT NOT NULL,
entity_type TEXT NOT NULL,
-- Identifiants techniques
run_id TEXT UNIQUE NOT NULL,
experiment_id TEXT NOT NULL,
-- Métadonnées de qualité
model_type TEXT,
version TEXT DEFAULT 'v1',
created_date TIMESTAMP,
data_start_date TIMESTAMP,
data_end_date TIMESTAMP,
-- Métriques de validité
status TEXT DEFAULT 'FINISHED',
size_mb REAL DEFAULT 0.0,
quality_score REAL,
validation_metrics TEXT
);
-- Index pour recherche instantanée
CREATE INDEX idx_business_key
ON cache_entries(crypto_symbol, timeframe, strategy, entity_type);
CREATE INDEX idx_entity_type ON cache_entries(entity_type);
CREATE INDEX idx_date_range ON cache_entries(data_start_date, data_end_date);
CREATE INDEX idx_model_type ON cache_entries(model_type);
Méthodes de Recherche Métier¶
class CVNTradeCacheIndex:
def find_feature_store(self, crypto_symbol: str, timeframe: str) -> CacheEntry
def find_labels(self, crypto_symbol: str, timeframe: str, strategy: str) -> CacheEntry
def find_feature_engineering(self, crypto_symbol: str, timeframe: str, strategy: str) -> CacheEntry
def find_feature_selection(self, crypto_symbol: str, timeframe: str, strategy: str, model_type: str) -> CacheEntry
def find_hpo_params(self, crypto_symbol: str, timeframe: str, strategy: str, model_type: str) -> CacheEntry
def find_trained_model(self, crypto_symbol: str, timeframe: str, strategy: str, model_type: str) -> CacheEntry
def check_pipeline_readiness(self, crypto_symbol: str, timeframe: str, strategy: str, model_type: str) -> Dict[str, bool]
Critères de Validité¶
Validation Temporelle¶
def is_valid(cache_entry: CacheEntry) -> bool:
"""
Valide un cache selon les critères métier
"""
# 1. Statut de base
if cache_entry.status != "FINISHED":
return False
# 2. Qualité minimale
if cache_entry.quality_score and cache_entry.quality_score < 0.7:
return False
# 3. Fraîcheur des données (selon l'entité)
max_age_days = get_max_age_for_entity(cache_entry.entity_type)
if is_too_old(cache_entry.created_date, max_age_days):
return False
# 4. Validation spécifique par type
return validate_entity_specific(cache_entry)
def get_max_age_for_entity(entity_type: EntityType) -> int:
"""
Durée de vie maximum par type d'entité
"""
return {
EntityType.FEATURE_STORE: 7, # 7 jours max
EntityType.LABELS: 30, # 30 jours max
EntityType.FEATURE_ENGINEERING: 15, # 15 jours max
EntityType.FEATURE_SELECTION: 30, # 30 jours max
EntityType.HPO_PARAMS: 60, # 60 jours max
EntityType.TRAINED_MODEL: 90 # 90 jours max
}[entity_type]
Integration MLflow Manager Central¶
Principe d'Isolation¶
class CVNTradeCacheManager:
def __init__(self, mlflow_manager):
self.mlflow_manager = mlflow_manager # Manager central OBLIGATOIRE
self.cache_index = CVNTradeCacheIndex()
def get_cached_entity(self, entity_type: EntityType, criteria: Dict[str, Any]) -> CacheResult:
"""
RECHERCHE EXCLUSIVE via l'index intelligent
"""
# 1. Recherche via index intelligent
result = self._search_via_intelligent_index(entity_type, criteria)
if result:
return result
# 2. Si rien trouvé → reconstitution nécessaire
return CacheResult(
found=False,
level=CacheLevel.EXACT,
entity=None,
run_id=None,
metadata=None,
fallback_reason="Reconstitution cache requise"
)
Règles d'Or¶
- JAMAIS de court-circuit MLflow : Tous les appels MLflow passent par le manager central
- Index intelligent EXCLUSIF : Fini les fallbacks en cascade, recherche directe uniquement
- Reconstitution explicite : Si le cache n'existe pas, on le dit clairement et on reconstitue
- Isolation des runs : Chaque composant gère son run propre via le manager central
Métriques de Performance¶
KPIs d'Efficacité¶
def track_orchestration_metrics():
"""
Métriques de performance de l'orchestrateur
"""
return {
"cache_hit_rate": cache_hits / total_requests,
"average_pipeline_time": sum(pipeline_times) / len(pipeline_times),
"full_reconstruction_rate": full_reconstructions / total_pipelines,
"fast_track_rate": fast_tracks / total_pipelines,
"entity_freshness_score": calculate_freshness_score()
}
Optimisations Futures¶
- Cache prédictif : Pré-génération des entités populaires
- Parallélisation : Exécution parallèle des étapes indépendantes
- Cache distribué : Extension multi-serveur
- ML de cache : Prédiction des besoins de reconstitution
Auteur: CVNTrade Sprint 7 Architecture Team Version: 1.0 Date: 2025-10-03 Status: Design Approved - Ready for Implementation