Introduction¶
Ce notebook implémente l’algorithme SCP (Split Conformal Prediction) afin de prédire des intervalles de prédiction pour le prix de voitures à partir de leurs caractéristiques.
Dans le cadre de la SCP standard, un régresseur classique (estimant la moyenne) est utilisé pour les prédictions ponctuelles. Les intervalles sont ensuite construits en ajoutant une marge d’erreur globale, calculée à partir des résidus absolus sur un ensemble de calibration.
Sous des hypothèses d’échangeabilité des données, l’algorithme SCP offre des garanties théoriques sur la couverture marginale (en moyenne sur l’ensemble des données).
Toutefois, la SCP standard produit des intervalles non adaptatifs : la largeur de l’intervalle est constante quelle que soit la voiture testée. Comme nous avons observé une forte hétéroscédasticité lors de l’analyse exploratoire (la variabilité du prix change selon les caractéristiques), cette approche risque d’être sous-optimale : les intervalles seront inutilement larges pour les voitures “faciles” à estimer et dangereusement étroits pour les voitures aux prix plus volatils.
Mise en place¶
Séparation des données : Les données sont séparées en trois ensembles : entraînement, calibration et test.
Entraînement du modèle : Un régresseur (ex: Random Forest ou Linéaire) est entraîné sur l’ensemble d’entraînement pour prédire le prix moyen.
Calibration :
Le modèle effectue des prédictions sur l’ensemble de calibration.
On calcule les résidus absolus (l’erreur absolue) pour chaque point de cet ensemble.
On détermine la constante correspondant au quantile de ces résidus. Cette valeur définit la demi-largeur fixe de l’intervalle.
Construction des intervalles : Pour chaque point de l’ensemble de test, l’intervalle de prédiction est construit en prenant la prédiction du régresseur et en ajoutant et soustrayant .
Évaluation du modèle: Le modèle est évalué sur l’ensemble de test en utilisant diverses métriques.
Chargement des données¶
import numpy as np
import polars as pl
# Polars display options
pl.Config.set_tbl_hide_column_data_types(True)
pl.Config.set_tbl_hide_dataframe_shape(True)
pl.Config.set_float_precision(2)
# Load preprocessed data
df = pl.read_parquet("../../data/car_prices_clean.parquet")
print(f"Dataset shape: {df.shape}")
df.head()Dataset shape: (19161, 18)
Définition des données du modèle¶
target = "Price"
numerical_features = [
"Production year",
"Leather interior",
"Engine volume (L)",
"Mileage (km)",
"Cylinders",
"Doors",
"Airbags",
"Turbo",
]
categorical_features = [
"Brand",
"Category",
"Fuel type",
"Gear box type",
"Drive wheels",
"Wheel",
"Color",
]
features = numerical_features + categorical_features
f"Unused features: {set(df.columns) - set(features)}""Unused features: {'Price', 'Levy tax', 'Model'}"Séparation des données¶
La taille de l’ensemble de calibration est choisie de manière à garantir un taux de couverture satisfaisant, tandis que la taille de l’ensemble de test permet de généraliser les performances du modèle.
from sklearn.model_selection import train_test_split
y = df.get_column("Price")
X = df.select(features)
X_train, X_temp, y_train, y_temp = train_test_split(
X, y, test_size=0.4, random_state=42
)
X_calib, X_test, y_calib, y_test = train_test_split(
X_temp, y_temp, test_size=0.5, random_state=42
)
print(f"Train set size (60%): {X_train.shape[0]}")
print(f"Calibration set size (20%): {X_calib.shape[0]}")
print(f"Test set size (20%): {X_test.shape[0]}")Train set size (60%): 11496
Calibration set size (20%): 3832
Test set size (20%): 3833
Définition du modèle¶
Le régresseur utilisé est sklearn.ensemble.HistGradientBoostingRegressor qui est un modèle de gradient boosting efficace pour les tâches de régression. Ce modèle est choisi car il ne nécessite pas de normalisation des données numériques et gère nativement les données catégorielles et manquantes.
from sklearn.ensemble import HistGradientBoostingRegressor
model = HistGradientBoostingRegressor(
loss="squared_error",
learning_rate=0.1,
max_iter=2000,
max_depth=None,
categorical_features="from_dtype",
early_stopping=True,
validation_fraction=0.1,
random_state=42,
)Entraînement du modèle¶
Les hyperparamètres du modèle sont optimisés sur l’ensemble d’entraînement avec sklearn.model_selection.HalvingRandomSearchCV avec une cross-validation à 5 plis sur l’ensemble d’entraînement. Les meilleurs hyperparamètres sont ensuite utilisés pour entraîner le modèle final sur l’ensemble d’entraînement complet (refit=True).
from scipy.stats import randint, uniform
from sklearn.experimental import enable_halving_search_cv # noqa:F401
from sklearn.model_selection import HalvingRandomSearchCV
param_distributions = {
"max_leaf_nodes": randint(20, 80),
"learning_rate": uniform(0.05, 0.15),
"min_samples_leaf": randint(20, 100),
"l2_regularization": uniform(0, 0.1),
}
search = HalvingRandomSearchCV(
model,
param_distributions=param_distributions,
n_candidates=50,
min_resources=200,
cv=5,
n_jobs=-1,
random_state=42,
refit=True,
)
search.fit(X_train, y_train);Split Conformal Prediction (SCP)¶
# 1. Predict on calibration set
df_calib = pl.DataFrame(
{
"True Price": y_calib,
"Predicted Price": search.best_estimator_.predict(X_calib),
}
)
# 2. Compute nonconformity scores
df_calib = df_calib.with_columns(
nonconformity_score=(pl.col("Predicted Price") - pl.col("True Price")).abs()
)
# 3. Compute quantile of nonconformity scores
alpha = 0.1
q_level = np.ceil((df_calib.height + 1) * (1 - alpha)) / df_calib.height
qhat = df_calib["nonconformity_score"].quantile(q_level, interpolation="linear")
# Le cadre théorique stricte impose de prendre la masse englobante supérieure (interpolation="higher").
# Mais en pratique l'interpolation linéaire semble plus pertinente.
f"SCP calibration qhat (α={alpha}): {qhat:.3f} at level {q_level:.4f}"'SCP calibration qhat (α=0.1): 12258.113 at level 0.9003'Distribution des scores de non-conformité¶
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(9, 2))
plt.boxplot(df_calib["nonconformity_score"], vert=False)
plt.axvline(
qhat,
color="red",
linestyle="--",
label=f"{q_level * 100:.2f}th Quantile = {qhat:.0f}",
)
plt.gca().set_title("Nonconformity Scores Distribution")
plt.gca().set_xlabel("Nonconformity Score (Price Absolute Error)")
plt.tight_layout()
plt.legend();
Les bornes inférieures et supérieure de la SCP sont calculées en ajoutant ou enlevant le quantile des scores de non-conformité aux prédictions du modèle : et où sont les scores de non-conformité calculés sur l’ensemble de calibration.
# Predict on test set
test_predictions = search.best_estimator_.predict(X_test)
df_test = pl.DataFrame(
{
"True Price": y_test,
"Lower Bound": test_predictions - qhat,
"Predicted Price": test_predictions,
"Upper Bound": test_predictions + qhat,
}
)
df_test = pl.concat([df_test, X_test], how="horizontal")
df_test.head()Évaluation des performances¶
from utils import IntoExpr, parse_into_expression
def coverage(
lower_bound: IntoExpr,
upper_bound: IntoExpr,
value: IntoExpr = "True Price",
) -> pl.Expr:
"""Return an expression to compute the coverage of an interval over values."""
# Note: parse_into_expression allows to pass:
# - strings which are interpreted as column names
# - polars expressions which are evaluated within the DataFrame context
# - literal values such as 0 for the lower_bound
value = parse_into_expression(value)
lower_bound = parse_into_expression(lower_bound)
upper_bound = parse_into_expression(upper_bound)
is_covered = value.is_between(lower_bound, upper_bound)
return (is_covered.mean().mul(100).round(1).cast(pl.Utf8) + pl.lit("%")).alias(
"Coverage"
)
def pinball_loss(y_true: IntoExpr, y_pred: IntoExpr, alpha: float) -> pl.Expr:
"""Return an expression to compute the Pinball loss."""
y_true = parse_into_expression(y_true)
y_pred = parse_into_expression(y_pred)
residual = y_true - y_pred
loss = pl.max_horizontal(alpha * residual, (alpha - 1) * residual)
return loss.mean().cast(pl.Int32).alias(f"Pinball q_{alpha}")
df_test.select(
coverage("Lower Bound", "Upper Bound"),
coverage(0, "Lower Bound").name.suffix(f" q{alpha / 2:.2f}"),
coverage(0, "Upper Bound").name.suffix(f" q{1 - (alpha / 2):.2f}"),
coverage(0, "Predicted Price").name.suffix(" Regressor"),
pinball_loss("True Price", "Lower Bound", alpha / 2),
pinball_loss("True Price", "Upper Bound", 1 - (alpha / 2)),
)Le taux de couverture global (90.2%) est conforme à ce qui est attendu (90%). Les garanties théoriques de la SCP sont respectées.
En s’intéressant aux bornes basses et hautes du modèle, on aperçoit cependant une sous-couverture (de respectivement -1.1% et -0.9%).
On observe un biais du régresseur qui prédit plus souvent une valeur supérieure à la vraie valeur (taux de couverture de 53.4%). Toutefois, ce biais positif n’explique pas le biais négatif des bornes de la SCP. La bonne explication est que la SCP produit des bornes symétriques autour de la prédiction du régresseur, hors la distribution des prix n’est pas symétrique (skewed à droite). La pinball loss confirme que les erreurs sont plus fréquentes et plus grandes dans les niveaux de prix élevés.
Des variantes asymétriques de la SCP existent où le quantile des erreurs est calculé séparément pour les bornes inférieures et supérieures, permettant de gérer une asymétrie dans la distribution de la variable d’intérêt.
Visualisation des prédictions pour un échantillon de l’ensemble de test¶
import altair as alt
alt.data_transformers.enable("vegafusion")
df_sample = (
df_test.sample(50, seed=42)
.sort("True Price")
.with_row_index("index")
.with_columns(
is_covered=pl.col("True Price").is_between(
pl.col("Lower Bound"), pl.col("Upper Bound")
)
)
)chart = alt.Chart(df_sample, title=f"SCP Intervals (50 samples, α={alpha})").encode(
alt.X("index:Q", title="Random sample (sorted by price)"),
tooltip=["index", "True Price", "Lower Bound", "Upper Bound", "is_covered"],
)
points = chart.mark_point(size=60, filled=True).encode(
alt.Y("True Price:Q"),
alt.Color("is_covered:N", title="Covered", scale={"range": ["red", "green"]}),
)
intervals = chart.mark_errorbar(ticks=True).encode(
alt.Y("Lower Bound:Q"), alt.Y2("Upper Bound:Q")
)
(points + intervals).properties(width=800)Analyse de la couverture par sous-groupes¶
df_test.group_by("Brand").agg(
coverage("Lower Bound", "Upper Bound"), pl.len().alias("Count")
).sort("Count", descending=True).head(10)df_test.group_by("Fuel type").agg(
coverage("Lower Bound", "Upper Bound"), pl.len().alias("Count")
).sort("Count", descending=True)df_test.group_by("Production year").agg(
coverage("Lower Bound", "Upper Bound"), pl.len().alias("Count")
).sort("Production year", descending=True).head(15)Analyse par sous-groupes:
La couverture varie selon les caractéristiques du véhicule (kilométrage, année, marque). Ces variations sont attendues car la SCP fournit une garantie marginale (sur l’ensemble du jeu de test), pas conditionnelle (par sous-groupe).