Feast Feature Store - Guide d'Intégration¶
Version: Feast 0.58.0 Date: Janvier 2026 Status: Production Ready
Table des Matières¶
- Vue d'Ensemble
- Installation
- Configuration
- Feature Views
- Export des Données
- Matérialisation
- Récupération des Features
- Intégration Pipeline
- Commandes CLI
- Troubleshooting
1. Vue d'Ensemble¶
Architecture Feast dans Champollion¶
ETL Pipeline Feature Store Inference
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Enriched │──export──▶ │ Parquet │ │ Model │
│ DataFrame │ │ Files │ │ │
└─────────────┘ └──────┬───────┘ └──────┬──────┘
│ │
▼ │
┌──────────────┐ │
│ PostgreSQL │◀── registry │
│ (Registry) │ │
└──────────────┘ │
│ │
materialize│ │
▼ │
┌──────────────┐ get_online │
│ Redis │◀───features────────┘
│(Online Store)│
└──────────────┘
Composants¶
| Composant | Type | Description |
|---|---|---|
| Registry | PostgreSQL | Métadonnées des features |
| Offline Store | Dask/Parquet | Training data |
| Online Store | Redis | Inference temps réel |
2. Installation¶
Packages installés¶
Dépendances installées :
- feast 0.58.0
- redis 4.6.0
- psycopg 3.2.5
- boto3 1.38.27
- hiredis 2.4.0
Vérification¶
3. Configuration¶
feature_store.yaml¶
# feature_repo/feature_store.yaml
project: champollion
provider: local
# Registry PostgreSQL (partagé avec MLflow)
registry:
registry_type: sql
path: postgresql+psycopg2://mlflow:mlflow@localhost:5432/champollion
cache_ttl_seconds: 60
# Offline store - Dask pour gros volumes
offline_store:
type: dask
# Online store - Redis pour inference temps réel
online_store:
type: redis
connection_string: localhost:6379
# Serialization version
entity_key_serialization_version: 3
Variables d'environnement¶
4. Feature Views¶
4.1 Entité¶
# feature_repo/entities.py
from feast import Entity, ValueType
crypto_symbol = Entity(
name="crypto_symbol",
description="Cryptocurrency trading pair (e.g., BTCUSDT)",
join_keys=["symbol"],
value_type=ValueType.STRING,
)
4.2 Feature Views définies¶
| Feature View | Features | TTL | Catégorie |
|---|---|---|---|
raw_ohlcv |
5 | 7j | OHLCV brut |
momentum_indicators |
11 | 7j | RSI, MACD, Stochastic |
volatility_indicators |
11 | 7j | Bollinger, ATR |
trend_regime |
9 | 7j | ADX, SMA, régime |
volume_indicators |
7 | 7j | Volume ratios |
gating_features |
14 | 7j | Gate model |
direction_features |
10 | 7j | Direction model |
temporal_features |
4 | 7j | Cyclical |
external_features |
1 | 1j | Fear & Greed |
candlestick_features |
4 | 7j | Patterns |
Total : 76 features
4.3 Exemple de Feature View¶
# feature_repo/features.py
from feast import FeatureView, Field, FileSource
from feast.types import Float64, Int64
momentum_indicators_fv = FeatureView(
name="momentum_indicators",
entities=[crypto_symbol],
ttl=timedelta(days=7),
schema=[
Field(name="RSI_7", dtype=Float64),
Field(name="RSI_14", dtype=Float64),
Field(name="RSI_28", dtype=Float64),
Field(name="MACD_12_26_9", dtype=Float64),
# ...
],
source=enriched_source,
tags={"team": "ml", "category": "momentum"},
)
5. Export des Données¶
5.1 Classe FeastExporter¶
from ETL.cvntrade_feast_exporter import CVNTrade_FeastExporter
exporter = CVNTrade_FeastExporter()
# Export DataFrame enrichi
files = exporter.export_enriched_data(
df=enriched_df,
symbol="BTCUSDT",
mode="full" # ou "append"
)
# {'momentum_indicators': '/path/to/BTCUSDT_momentum_indicators.parquet', ...}
5.2 Helper function¶
from ETL.cvntrade_feast_exporter import export_from_etl_pipeline
# Export complet depuis le pipeline ETL
files = export_from_etl_pipeline(
coin="BTCUSDT",
mode="train",
materialize=True # Matérialise vers Redis
)
5.3 Structure des fichiers générés¶
data/feature_store/
├── ohlcv/
│ └── BTCUSDT_ohlcv.parquet
├── enriched/
│ ├── BTCUSDT_momentum_indicators.parquet
│ ├── BTCUSDT_volatility_indicators.parquet
│ ├── BTCUSDT_trend_regime.parquet
│ ├── BTCUSDT_volume_indicators.parquet
│ ├── BTCUSDT_gating_features.parquet
│ ├── BTCUSDT_direction_features.parquet
│ ├── BTCUSDT_temporal_features.parquet
│ ├── BTCUSDT_external_features.parquet
│ └── BTCUSDT_candlestick_features.parquet
└── selected/
└── (post feature selection)
6. Matérialisation¶
6.1 Via Python¶
exporter = CVNTrade_FeastExporter()
# Matérialise les 7 derniers jours vers Redis
exporter.materialize_to_online_store(
start_date=datetime.utcnow() - timedelta(days=7),
end_date=datetime.utcnow()
)
6.2 Via CLI¶
cd feature_repo
# Matérialisation complète
feast materialize 2025-01-01T00:00:00 2026-01-09T00:00:00
# Matérialisation incrémentale (depuis dernière)
feast materialize-incremental $(date +%Y-%m-%dT%H:%M:%S)
7. Récupération des Features¶
7.1 Online Features (Inference)¶
from inference.cvntrade_feast_inference import CVNTrade_FeastInference
inference = CVNTrade_FeastInference()
# Récupérer features pour un symbole
features_df = inference.get_features_for_inference(
symbol="BTCUSDT",
model_type="gate" # ou "direction"
)
# Prédiction complète
result = inference.predict(
symbol="BTCUSDT",
model_type="gate"
)
# {
# "symbol": "BTCUSDT",
# "prediction": 1,
# "probability": 0.82,
# "latency_ms": 12.5
# }
7.2 Historical Features (Training)¶
from feast import FeatureStore
fs = FeatureStore(repo_path="feature_repo")
# DataFrame avec entités et timestamps
entity_df = pd.DataFrame({
"symbol": ["BTCUSDT"] * 1000,
"timestamp": pd.date_range("2025-01-01", periods=1000, freq="h")
})
# Récupérer features historiques
training_df = fs.get_historical_features(
entity_df=entity_df,
features=[
"momentum_indicators:RSI_14",
"volatility_indicators:ATRr_14",
"gating_features:gating_opportunity_score",
]
).to_df()
7.3 Batch Prediction¶
results = inference.predict_batch(
symbols=["BTCUSDT", "ETHUSDT", "SOLUSDT"],
model_type="gate"
)
# DataFrame avec colonnes: symbol, prediction, probability, latency_ms
8. Intégration Pipeline¶
8.1 DAG Airflow¶
# dags/dag_feast_sync.py
from airflow import DAG
from airflow.operators.python import PythonOperator
def sync_feast():
from ETL.cvntrade_feast_exporter import export_from_etl_pipeline
export_from_etl_pipeline("BTCUSDT", materialize=True)
dag = DAG('feast_sync', schedule='@hourly')
sync_task = PythonOperator(
task_id='sync_feast',
python_callable=sync_feast,
dag=dag
)
8.2 Intégration Training¶
# Dans le trainer
from inference.cvntrade_feast_inference import CVNTrade_FeastInference
class MyTrainer:
def __init__(self):
self.feast = CVNTrade_FeastInference()
def get_training_data(self, symbol, start_date, end_date):
entity_df = self._build_entity_df(symbol, start_date, end_date)
return self.feast.get_historical_features(entity_df)
9. Commandes CLI¶
cd feature_repo
# Appliquer les définitions au registry
feast apply
# Lister les feature views
feast feature-views list
# Lister les entités
feast entities list
# Voir le plan des changements
feast plan
# Matérialiser vers online store
feast materialize 2025-01-01 2026-01-09
# Matérialisation incrémentale
feast materialize-incremental $(date +%Y-%m-%dT%H:%M:%S)
# UI web (optionnel)
feast ui
10. Troubleshooting¶
Erreur: "Can't load plugin: postgresql.psycopg"¶
Cause: SQLAlchemy 1.4 ne reconnaît pas psycopg3
Solution: Utiliser postgresql+psycopg2 dans la config
Erreur: "Entity key serialization version deprecated"¶
Solution: Mettre à jour la config
Redis connection refused¶
Vérifier:
Features vides lors de get_online_features¶
Causes possibles:
1. Données non matérialisées → feast materialize
2. TTL expiré → Rematérialiser
3. Entité non trouvée → Vérifier le symbol
Performance lente¶
Optimisations:
1. Utiliser hiredis (installé par défaut)
2. Batch les requêtes : get_online_features(entity_rows=[...])
3. Augmenter cache_ttl_seconds dans la config
Références¶
Dernière mise à jour : Janvier 2026