Introduction¶
Ce notebook implémente l’algorithme CV+ (Cross-Validation Plus) afin de prédire des intervalles de prédiction pour le prix de voitures à partir de leurs caractéristiques.
Dans ce cadre, deux régresseurs quantiles (pour les bornes inférieure et supérieure) sont utilisés pour effectuer les prédictions initiales. Ensuite, les intervalles sont calibrés en utilisant les scores de conformité calculés sur les plis de validation croisée laissés de côté (out-of-fold).
Sous des hypothèses d’échangeabilité des données, l’algorithme CV+ produit des intervalles de prédiction avec des garanties théoriques sur la couverture (probabilité que la vraie valeur soit dans l’intervalle). Les intervalles obtenus sont adaptatifs : leur largeur varie selon la difficulté de prédiction de chaque point, capturant ainsi l’hétéroscédasticité des données.
Mise en place¶
Séparation des données: Les données sont séparés en deux ensembles : entraînement et test.
Entraînement du modèle: Deux régresseurs quantiles sont entraînés sur l’ensemble d’entraînement (hyperparamètres seulement).
Algorithme CV+:
Les données d’entraînement sont divisées en K plis disjoints.
Pour chaque pli :
Les modèles sont entraînés sur les autres plis.
On calcule les prédictions sur le pli laissé de côté (calibration fold).
On calcule les scores de conformité sur ce pli (l’écart entre la prédiction et la vraie valeur).
Les modèles entraînés sur chaque pli effectuent ensuite des prédictions sur l’ensemble de test.
Construction des intervalles:
Pour chaque point de l’ensemble de test, on agrège les prédictions des modèles.
On ajuste les bornes en ajoutant le quantile approprié des scores de conformité calculés précédemment.
Évaluation du modèle: Les intervalles de prédiction finaux sont évalués sur l’ensemble de test.
Chargement des données¶
import numpy as np
import polars as pl
import polars.selectors as cs
# Polars display options
pl.Config.set_tbl_hide_column_data_types(True)
pl.Config.set_tbl_hide_dataframe_shape(True)
pl.Config.set_float_precision(2)
# Load preprocessed data
df = pl.read_parquet("../../data/car_prices_clean.parquet")
print(f"Dataset shape: {df.shape}")
df.head()Dataset shape: (19161, 18)
Définition des données du modèle¶
target = "Price"
numerical_features = [
"Production year",
"Leather interior",
"Engine volume (L)",
"Mileage (km)",
"Cylinders",
"Doors",
"Airbags",
"Turbo",
]
categorical_features = [
"Brand",
"Category",
"Fuel type",
"Gear box type",
"Drive wheels",
"Wheel",
"Color",
]
features = numerical_features + categorical_features
f"Columns not used: {[col for col in df.columns if col not in [*features, target]]}""Columns not used: ['Levy tax', 'Model']"# Set categorical dtypes to benefit from native categorical handling
df = df.cast(dict.fromkeys(categorical_features, pl.Categorical))Séparation des données¶
La taille de l’ensemble de calibration est choisie de manière à garantir un taux de couverture satisfaisant, tandis que la taille de l’ensemble de test permet de généraliser les performances du modèle.
from sklearn.model_selection import train_test_split
X = df.select(features)
y = df.get_column("Price")
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
print(f"Train set size (60%): {X_train.shape[0]}")
print(f"Test set size (20%): {X_test.shape[0]}")Train set size (60%): 15328
Test set size (20%): 3833
Définition du modèle¶
Le régresseur utilisé est sklearn.ensemble.HistGradientBoostingRegressor qui est un modèle de gradient boosting efficace pour les tâches de régression. Ce modèle est choisi car il ne nécessite pas de normalisation des données numériques et gère nativement les données catégorielles et manquantes.
Le paramètre loss est défini sur "quantile" pour entraîner un régresseur quantile. Deux modèles sont entraînés : un pour le quantile inférieur () et un pour le quantile supérieur (1 - ).
from sklearn.base import clone
from sklearn.ensemble import HistGradientBoostingRegressor
base_model = HistGradientBoostingRegressor(
loss="quantile",
quantile=None,
categorical_features="from_dtype",
early_stopping=True,
validation_fraction=0.1,
random_state=42,
)
alpha = 0.1 # 90% prediction intervals
q_low, q_high = alpha / 2, 1 - alpha / 2
print(f"Quantiles: {q_low}, {q_high}")
model_low = clone(base_model).set_params(quantile=q_low)
model_high = clone(base_model).set_params(quantile=q_high)Quantiles: 0.05, 0.95
Entraînement du modèle¶
Les hyperparamètres du modèle sont optimisés sur l’ensemble d’entraînement avec sklearn.model_selection.HalvingRandomSearchCV avec une cross-validation à 5 plis sur l’ensemble d’entraînement. Les meilleurs hyperparamètres sont ensuite utilisés pour entraîner le modèle final sur l’ensemble d’entraînement complet (refit=True).
from scipy.stats import randint, uniform
from sklearn.experimental import enable_halving_search_cv # noqa:F401
from sklearn.model_selection import HalvingRandomSearchCV
param_distributions = {
"max_iter": randint(100, 300),
"max_leaf_nodes": randint(20, 80),
"learning_rate": uniform(0.05, 0.15),
"min_samples_leaf": randint(20, 100),
"l2_regularization": uniform(0, 0.1),
}
base_search = HalvingRandomSearchCV(
estimator=None,
param_distributions=param_distributions,
n_candidates=50,
min_resources=200,
cv=5,
n_jobs=-1,
random_state=42,
refit=True,
)
search_low = clone(base_search).set_params(estimator=model_low)
search_high = clone(base_search).set_params(estimator=model_high)
search_low.fit(X_train, y_train)
search_high.fit(X_train, y_train);Algorithme CV+¶
This implementation reuse best found hyperparameters over training set which is called non-nested cross-validation. This lead to slightly optimistic results, but is much more computationally efficient:
non-nested CV+: grid search (50 fits) + 5-folds CV+ (5 fits) = 55 fits
nested CV+: 5-folds outer (5 fits) x (grid search (50 fits) = 250 fits
Comparison between nested and non-nested cross-validation for CV+: https://
from sklearn.model_selection import KFold
kf = KFold(n_splits=5, shuffle=True, random_state=42)
# Stocker les résultats intermédiaires
df_test_cvp = pl.DataFrame({"True Price": y_test})
nonconformity_scores: list[float] = []
# --- Algorithme CV+ : Boucle sur les plis ---
for k, (train_idx, calib_idx) in enumerate(kf.split(X_train)):
# A. Séparation des données pour ce pli
X_tr_fold, y_tr_fold = X_train[train_idx], y_train[train_idx]
X_cal_fold, y_cal_fold = X_train[calib_idx], y_train[calib_idx]
# B. Entraînement des modèles sur les (K-1) plis
# Utilise les meilleurs hyperparamètres trouvés sur l'ensemble d'entraînement
est_low_k = clone(search_low.best_estimator_).fit(X_tr_fold, y_tr_fold)
est_high_k = clone(search_high.best_estimator_).fit(X_tr_fold, y_tr_fold)
# C. Calcul des scores de non conformité sur le pli de calibration
cal_low_pred = est_low_k.predict(X_cal_fold)
cal_high_pred = est_high_k.predict(X_cal_fold)
residuals = np.maximum(cal_low_pred - y_cal_fold, y_cal_fold - cal_high_pred)
nonconformity_scores.extend(residuals)
# D. Prédictions sur l'ensemble de test par les modèles de ce pli
df_test_cvp = df_test_cvp.with_columns(
pl.lit(est_low_k.predict(X_test)).alias(f"Low Pred Fold {k}"),
pl.lit(est_high_k.predict(X_test)).alias(f"High Pred Fold {k}"),
)Calcul du quantile de calibration¶
# Compute quantile of nonconformity scores from all folds
n_calib = len(nonconformity_scores)
q_level = np.ceil((n_calib + 1) * (1 - alpha)) / n_calib
qhat = np.quantile(nonconformity_scores, q_level, method="linear")
f"CV+ calibration qhat (α={alpha}): {qhat:.3f} at level {q_level:.4f} (n_calib={n_calib})"'CV+ calibration qhat (α=0.1): 354.124 at level 0.9001 (n_calib=15328)'Construction des intervalles¶
df_test_cvp = (
df_test_cvp.with_columns(
# Agrégation des prédictions: on prend la médiane des K modèles
pl.concat_list(cs.starts_with("Low Pred Fold"))
.list.median()
.alias("Lower Bound Median"),
pl.concat_list(cs.starts_with("High Pred Fold"))
.list.median()
.alias("Upper Bound Median"),
)
.with_columns(
# On applique la correction qhat
(pl.col("Lower Bound Median") - qhat).alias("Lower Bound CV+"),
(pl.col("Upper Bound Median") + qhat).alias("Upper Bound CV+"),
)
.with_columns(
(pl.col("Upper Bound CV+") - pl.col("Lower Bound CV+")).alias("Interval Width"),
)
.drop(cs.starts_with("Low Pred Fold") | cs.starts_with("High Pred Fold"))
)
# Add features back to the results for analysis
df_test_cvp = pl.concat([df_test_cvp, X_test], how="horizontal")
df_test_cvp.head()# Certaines bornes peuvent être négatives, on les remplace par 0
df_test_cvp = df_test_cvp.with_columns(pl.max_horizontal(pl.col("Lower Bound CV+"), 0))
# On vérifie qu'il n'y a pas de quantile crossing dans les intervalles CV+
df_test_cvp.filter(pl.col("Lower Bound CV+") > pl.col("Upper Bound CV+")).height0Évaluation des performances (non-nested CV+)¶
from utils import coverage, pinball_loss
df_test_cvp.select(
coverage("Lower Bound CV+", "Upper Bound CV+").alias("Coverage CV+"),
coverage(0, "Lower Bound CV+").name.suffix(f" CV+ q{q_low}"),
coverage(0, "Upper Bound CV+").name.suffix(f" CV+ q{q_high}"),
pinball_loss("True Price", "Lower Bound CV+", q_low),
pinball_loss("True Price", "Upper Bound CV+", q_high),
pl.col("Interval Width").mean().cast(pl.Int32),
)Le taux de couverture globale est proche de la couverture cible (90%) mais est relativement élevé au vu de la taille de l’échantillon de test (~4000 points).
Les autres métriques restent similaires à celles de la CQR.
Couverture par décile de prix¶
df_test_cvp.with_columns(
pl.col("True Price").qcut(10, include_breaks=True).alias("True Price Decile")
).group_by("True Price Decile").agg(
coverage("Lower Bound CV+", "Upper Bound CV+").alias("Coverage"),
pl.col("Interval Width").mean().cast(pl.Int32),
pl.col("Lower Bound CV+").mean().round(0).alias("Avg Lower Bound CV+"),
pl.col("Upper Bound CV+").mean().round(0).alias("Avg Upper Bound CV+"),
).sort("True Price Decile")Commentaires identiques à CQR : l’algorithme CV+ améliore la robustesse et l’efficacité des données, mais il ne résout PAS mieux l’hétéroscédasticité que la CQR car il se contente d’ajuster les intervalles globaux via les scores de conformité.
Largeurs d’intervalles moyenne vs kilométrage¶
import altair as alt
alt.data_transformers.enable("vegafusion")
_df_temp = df_test_cvp.filter(
pl.col("Mileage (km)").is_between(0, 500_000)
& pl.col("Interval Width").is_between(0, 150_000)
)
alt.Chart(_df_temp).mark_rect().encode(
alt.X("Interval Width:Q").bin(maxbins=50),
alt.Y("Mileage (km):Q").bin(maxbins=50),
alt.Color("count()").scale(type="sqrt"),
).properties(title="CV+ Interval Width vs Mileage (Density Plot)")On observe clairement une relation inverse entre le prix réel et la largeur relative des intervalles de prédiction: les véhicules moins chers ont des intervalles proportionnellement plus larges, tandis que les véhicules plus chers bénéficient d’intervalles plus étroits en pourcentage de leur prix.
_df_temp = df_test_cvp.filter(
pl.col("Production year").is_between(1990, 2020)
& pl.col("Interval Width").is_between(0, 150_000)
)
alt.Chart(_df_temp).mark_rect(clip=True).encode(
alt.X("Production year:Q").bin(maxbins=50),
alt.Y("Interval Width:Q").bin(maxbins=50),
alt.Color("count()").scale(type="sqrt"),
).properties(title="CV+ Interval Width vs Production year (Density Plot)")Analyse de la couverture par sous-groupes¶
df_test_cvp.group_by("Brand").agg(
coverage("Lower Bound CV+", "Upper Bound CV+"), pl.len().alias("Count")
).sort("Count", descending=True).head(10)df_test_cvp.group_by("Fuel type").agg(
coverage("Lower Bound CV+", "Upper Bound CV+"), pl.len().alias("Count")
).sort("Count", descending=True)df_test_cvp.group_by("Production year").agg(
coverage("Lower Bound CV+", "Upper Bound CV+"), pl.len().alias("Count")
).sort("Production year", descending=True).head(15)Analyse par sous-groupes:
La couverture peut varier légèrement selon les caractéristiques du véhicule (kilométrage, année, marque) toutefois ces variations restent plus limitées qu’avec l’algorithme SCP.