Skip to content

Champollion - Architecture PostgreSQL

Version: 1.0.0 (Sprint 9) Date: Janvier 2026 Status: Production Ready


Table des Matières

  1. Vue d'Ensemble
  2. Configuration
  3. Services Utilisant PostgreSQL
  4. Modèle de Données
  5. API et SDK
  6. Patterns d'Accès
  7. Sécurité
  8. Maintenance

1. Vue d'Ensemble

Champollion utilise PostgreSQL comme base de données centralisée pour : - MLflow Tracking : Métadonnées des expériences, runs, métriques et tags - Feast Registry : Définitions des Feature Views et entités - Cache Index : Index intelligent pour recherche rapide des caches S3

Principe Fondamental

Une base unique partagée : Tous les services utilisent la même instance PostgreSQL avec des tables dédiées, simplifiant l'administration et garantissant la cohérence.

┌─────────────────────────────────────────────────────────────┐
│                      PostgreSQL                              │
│                   (champollion database)                     │
├─────────────────┬─────────────────┬─────────────────────────┤
│  MLflow Tables  │  Feast Tables   │  Custom Tables          │
│  - experiments  │  - feast_*      │  - cvntrade_cache_index │
│  - runs         │                 │                         │
│  - metrics      │                 │                         │
│  - params       │                 │                         │
│  - tags         │                 │                         │
└─────────────────┴─────────────────┴─────────────────────────┘

2. Configuration

2.1 Variables d'Environnement

# .env
MLFLOW_TRACKING_URI=postgresql://mlflow:mlflow@localhost:5432/champollion

2.2 URI de Connexion

Service Format URI Exemple
MLflow postgresql://user:pass@host:port/db postgresql://mlflow:mlflow@localhost:5432/champollion
Feast postgresql+psycopg2://user:pass@host:port/db postgresql+psycopg2://mlflow:mlflow@localhost:5432/champollion
Cache Index Même URI que MLflow postgresql://mlflow:mlflow@localhost:5432/champollion

2.3 Fichiers de Configuration

Fichier Service Configuration
.env Global MLFLOW_TRACKING_URI
feature_repo/feature_store.yaml Feast registry.path
src/commun/cache/cvntrade_cache_index.py Cache Index Utilise MLFLOW_TRACKING_URI

3. Services Utilisant PostgreSQL

3.1 MLflow Tracking

Fichier : src/commun/mlflow/cvntrade_mlflow_manager.py

MLflow stocke dans PostgreSQL : - Expériences et leurs métadonnées - Runs avec paramètres, métriques et tags - Références vers les artefacts S3 - Model Registry (versions, stages, aliases)

from commun.mlflow.cvntrade_mlflow_manager import CVNTrade_MLFlowManager

mlflow_manager = CVNTrade_MLFlowManager("CVNTrade_Gate")

# Les métadonnées sont stockées dans PostgreSQL
# Les artefacts sont stockés sur S3
run = mlflow_manager.start_run("training_BTCUSDT")
mlflow_manager.log_params({"learning_rate": 0.01}, run_id=run.info.run_id)
mlflow_manager.log_metrics({"f1_score": 0.758}, run_id=run.info.run_id)

3.2 Feast Registry

Fichier : feature_repo/feature_store.yaml

Feast stocke dans PostgreSQL : - Définitions des entités (crypto_entity, etc.) - Feature Views et leurs schémas - Data Sources et configurations - Metadata de matérialisation

# feature_repo/feature_store.yaml
registry:
  registry_type: sql
  path: postgresql+psycopg2://mlflow:mlflow@localhost:5432/champollion
  cache_ttl_seconds: 60

3.3 Cache Index Intelligent

Fichier : src/commun/cache/cvntrade_cache_index.py

Index métier pour recherche rapide des caches : - Lookup par critères métier (symbol, timeframe, strategy) - Index composés pour performance <10ms - Statistiques et monitoring du cache

from commun.cache.cvntrade_cache_index import CVNTradeCacheIndex

cache_index = CVNTradeCacheIndex()

# Recherche rapide d'un cache
entry = cache_index.find_trained_model(
    crypto_symbol="BTCUSDT",
    timeframe="1h",
    strategy="SL1.2_TP1.3",
    model_type="XGBoost"
)

if entry:
    run_id = entry.run_id  # Utilisable pour récupérer les artefacts

4. Modèle de Données

4.1 Tables MLflow (créées automatiquement)

-- Tables principales MLflow
experiments (
    experiment_id SERIAL PRIMARY KEY,
    name VARCHAR(256) UNIQUE,
    artifact_location VARCHAR(256),
    lifecycle_stage VARCHAR(32)
)

runs (
    run_uuid VARCHAR(32) PRIMARY KEY,
    experiment_id INTEGER REFERENCES experiments,
    status VARCHAR(20),
    start_time BIGINT,
    end_time BIGINT,
    artifact_uri VARCHAR(256)
)

params (
    key VARCHAR(250),
    value VARCHAR(8000),
    run_uuid VARCHAR(32) REFERENCES runs
)

metrics (
    key VARCHAR(250),
    value DOUBLE PRECISION,
    timestamp BIGINT,
    step BIGINT,
    run_uuid VARCHAR(32) REFERENCES runs
)

tags (
    key VARCHAR(250),
    value VARCHAR(8000),
    run_uuid VARCHAR(32) REFERENCES runs
)

-- Model Registry
registered_models (
    name VARCHAR(256) PRIMARY KEY,
    creation_time BIGINT,
    last_updated_time BIGINT
)

model_versions (
    name VARCHAR(256) REFERENCES registered_models,
    version INTEGER,
    source VARCHAR(512),
    run_id VARCHAR(32),
    current_stage VARCHAR(20)
)

4.2 Tables Feast (créées automatiquement)

-- Registry Feast (tables préfixées feast_)
feast_metadata (
    metadata_key VARCHAR(256) PRIMARY KEY,
    metadata_value BYTEA,
    last_updated_timestamp TIMESTAMP
)

feast_entities (
    entity_name VARCHAR(256) PRIMARY KEY,
    entity_proto BYTEA
)

feast_feature_views (
    feature_view_name VARCHAR(256) PRIMARY KEY,
    feature_view_proto BYTEA
)

feast_data_sources (
    data_source_name VARCHAR(256) PRIMARY KEY,
    data_source_proto BYTEA
)

4.3 Table Custom : cvntrade_cache_index

CREATE TABLE cvntrade_cache_index (
    id SERIAL PRIMARY KEY,

    -- Identifiants métier (index principaux)
    crypto_symbol VARCHAR(20) NOT NULL,      -- BTCUSDT, ETHUSDT
    timeframe VARCHAR(10) NOT NULL,          -- 15m, 1h, 4h, 1d
    strategy VARCHAR(50) NOT NULL,           -- SL1.2_TP1.3
    entity_type VARCHAR(50) NOT NULL,        -- feature_store, labels, hpo_params, etc.

    -- Identifiants techniques
    run_id VARCHAR(100) UNIQUE NOT NULL,     -- Lien vers MLflow run
    experiment_id VARCHAR(100) NOT NULL,

    -- Métadonnées
    model_type VARCHAR(50),                  -- XGBoost, LightGBM, CatBoost
    version VARCHAR(20) DEFAULT 'v1',
    created_date TIMESTAMP,
    data_start_date TIMESTAMP,
    data_end_date TIMESTAMP,

    -- Statut
    status VARCHAR(20) DEFAULT 'FINISHED',
    size_mb REAL DEFAULT 0.0,

    -- Dépendances (JSON)
    dependencies JSONB,

    -- Métriques qualité (JSON)
    quality_score REAL,
    validation_metrics JSONB,

    -- Timestamp de mise à jour
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Index pour recherche rapide
CREATE INDEX idx_cvntrade_cache_index_business_key
    ON cvntrade_cache_index(crypto_symbol, timeframe, strategy, entity_type);

CREATE INDEX idx_cvntrade_cache_index_entity_type
    ON cvntrade_cache_index(entity_type);

CREATE INDEX idx_cvntrade_cache_index_date_range
    ON cvntrade_cache_index(data_start_date, data_end_date);

CREATE INDEX idx_cvntrade_cache_index_model_type
    ON cvntrade_cache_index(model_type);

4.4 Types d'Entités Cache

entity_type Description Dépendances
feature_store Données OHLCV + indicateurs Aucune
labels Étiquettes triple-barrier feature_store
feature_engineering Features transformées labels
feature_selection Features sélectionnées feature_engineering
hpo_params Hyperparamètres optimisés feature_selection
trained_model Modèle entraîné hpo_params

5. API et SDK

5.1 psycopg2 (Driver PostgreSQL Principal)

Utilisé par CVNTradeCacheIndex pour accès direct :

import psycopg2
from psycopg2.extras import RealDictCursor

# Context manager pour connexions
@contextmanager
def _get_connection(self):
    conn = psycopg2.connect(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password
    )
    try:
        yield conn
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

# Requête avec paramètres sécurisés
with self._get_connection() as conn:
    cursor = conn.cursor(cursor_factory=RealDictCursor)
    cursor.execute("""
        SELECT * FROM cvntrade_cache_index
        WHERE crypto_symbol = %s AND timeframe = %s
    """, (symbol, timeframe))
    rows = cursor.fetchall()

5.2 SQLAlchemy (via Feast)

Feast utilise SQLAlchemy pour le registry :

# Connexion via URI SQLAlchemy
# postgresql+psycopg2://user:password@host:port/database

# Utilisé en interne par Feast - pas d'accès direct nécessaire

5.3 MLflow Client

MLflow abstrait complètement l'accès PostgreSQL :

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Toutes les opérations passent par l'API MLflow
experiment = client.get_experiment_by_name("CVNTrade_Gate")
runs = client.search_runs(experiment_ids=[experiment.experiment_id])

6. Patterns d'Accès

6.1 Connection Pooling via Context Manager

@contextmanager
def _get_connection(self):
    """Context manager pour les connexions PostgreSQL."""
    conn = psycopg2.connect(...)
    try:
        yield conn
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

6.2 Transactions ACID

with self._get_connection() as conn:
    cursor = conn.cursor()

    # Transaction atomique
    cursor.execute("INSERT INTO ... VALUES ...")
    cursor.execute("UPDATE ... SET ...")

    # Commit automatique via context manager
    # Rollback automatique en cas d'exception

6.3 Upsert avec ON CONFLICT

cursor.execute("""
    INSERT INTO cvntrade_cache_index
    (crypto_symbol, timeframe, strategy, entity_type, run_id, ...)
    VALUES (%s, %s, %s, %s, %s, ...)
    ON CONFLICT (run_id) DO UPDATE SET
        crypto_symbol = EXCLUDED.crypto_symbol,
        timeframe = EXCLUDED.timeframe,
        updated_at = CURRENT_TIMESTAMP
""", params)

6.4 Recherche Multi-Critères

def _find_entity(self, entity_type, crypto_symbol, timeframe,
                 strategy="ANY", model_type=None):
    query = """
        SELECT * FROM cvntrade_cache_index
        WHERE entity_type = %s AND crypto_symbol = %s AND timeframe = %s
    """
    params = [entity_type, crypto_symbol, timeframe]

    if strategy != "ANY":
        query += " AND strategy = %s"
        params.append(strategy)

    if model_type:
        query += " AND model_type = %s"
        params.append(model_type)

    query += " ORDER BY quality_score DESC NULLS LAST LIMIT 1"

    cursor.execute(query, params)

7. Sécurité

7.1 Protection SQL Injection

Toutes les requêtes utilisent des paramètres positionnels :

# BON - Paramètres sécurisés
cursor.execute("SELECT * FROM table WHERE id = %s", (user_id,))

# MAUVAIS - Injection possible
cursor.execute(f"SELECT * FROM table WHERE id = {user_id}")  # JAMAIS

7.2 Credentials

  • Stockés dans .env (non commité)
  • Accédés via dotenv_values() ou os.getenv()
  • Jamais hardcodés dans le code
from dotenv import dotenv_values
_env = dotenv_values()

connection_string = _env.get(
    "MLFLOW_TRACKING_URI",
    "postgresql://mlflow:mlflow@localhost:5432/champollion"
)

7.3 Permissions PostgreSQL

-- Utilisateur MLflow avec permissions limitées
CREATE USER mlflow WITH PASSWORD 'mlflow';
GRANT ALL PRIVILEGES ON DATABASE champollion TO mlflow;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO mlflow;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO mlflow;

8. Maintenance

8.1 Nettoyage des Caches Anciens

cache_index = CVNTradeCacheIndex()

# Supprimer les entrées > 30 jours
deleted = cache_index.cleanup_old_entries(days_old=30)
print(f"{deleted} entrées supprimées")

8.2 Statistiques du Cache

summary = cache_index.get_cache_summary()
# {
#     'total_entries': 150,
#     'total_size_mb': 2450.5,
#     'latest_date': datetime(2026, 1, 9, ...),
#     'by_entity_type': {
#         'feature_store': 12,
#         'labels': 12,
#         'trained_model': 36,
#         ...
#     },
#     'by_crypto': {
#         'BTCUSDT': 50,
#         'ETHUSDT': 50,
#         ...
#     },
#     'storage': 'PostgreSQL'
# }

8.3 Vérification Pipeline Readiness

readiness = cache_index.check_pipeline_readiness(
    crypto_symbol="BTCUSDT",
    timeframe="1h",
    strategy="SL1.2_TP1.3",
    model_type="XGBoost"
)
# {
#     'feature_store': True,
#     'labels': True,
#     'feature_engineering': True,
#     'feature_selection': True,
#     'hpo_params': True,
#     'trained_model': True,
#     'ready_for_training': True,
#     'ready_for_inference': True
# }

8.4 Backup et Restauration

# Backup
pg_dump -U mlflow champollion > backup_champollion.sql

# Restauration
psql -U mlflow champollion < backup_champollion.sql

# Backup MLflow uniquement
pg_dump -U mlflow -t 'experiments' -t 'runs' -t 'params' -t 'metrics' -t 'tags' champollion > mlflow_backup.sql

Références


Dernière mise à jour : Janvier 2026