Téléchargement des données¶
import zipfile
from io import BytesIO
from pathlib import Path
import requests
data_dir = Path("../../data/")
data_dir.mkdir(parents=True, exist_ok=True)
filepath = data_dir / "df_study_L18_w6.csv"
if not filepath.exists():
# Download and extract zip.
url = "https://github.com/MINCHELLA-Paul/Master-MIASHS/raw/6abd32cc11d73850a0d8c54a3ab9a31200b6d97b/Atelier_SigBERT/df_study_selected.zip"
response = requests.get(url)
with zipfile.ZipFile(BytesIO(response.content)) as zip_ref:
zip_ref.extractall(data_dir)Chargement des données¶
Le jeu de données contient 3555 lignes et 759 colonnes.
Chaque ligne représente un individu représenté par un ID, un temps de survie, un indicateur de censure, et 756 caractéristiques correspondant aux signatures calculées à partir des données médicales.
Le taux de censure est d’environ 33%. Le taux de survie médian est de 1329 jours.
Le jeu de données ne contient pas de valeurs manquantes.
import polars as pl
import polars.selectors as cs
# Polars display options
pl.Config.set_tbl_hide_dataframe_shape(True)
pl.Config.set_float_precision(3)
# Chargement des données
df = pl.read_csv(filepath, infer_schema_length=None)
df = df.select(["time", "event", cs.starts_with("sig_")])
dfdf.describe()Nettoyage des données¶
Initialement, les colonnes de signatures suivantes était supprimés :
Les colonnes avec une variance inférieur à . La signature 54 présente par exmeple une variance nulle et est supprimée.
Les colonnes fortement corrélées (seuil de 0.9) pour éviter la multicolinéarité.
En pratique cela évite de nombreux problèmes de convergence notamment pour les modèles de Cox et sans affecte les performances des modèles.
Toutefois, au vu de la haute dimensionnalité des données, nous avons décidé de recourir plutôt à une réduction de dimensionnalité via l’ACP avant d’entraîner nos modèles.
Séparation des données¶
Pour la sépération des données on stratifie par rapport à l’indicateur de censure afin de conserver la même proportion de censurés dans les ensembles d’entrainement et de test.
from sklearn.model_selection import train_test_split
X_train, X_test = train_test_split(df, test_size=0.33, stratify=df.get_column("event"))
print(f"Taille de l'ensemble d'entraînement : {X_train.height}")
print(f"Taille de l'ensemble de test : {X_test.height}")Taille de l'ensemble d'entraînement : 2381
Taille de l'ensemble de test : 1174
Standardisation des données¶
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler().set_output(transform="polars")
# Standardisation des signatures
scaler.fit(X_train.select(cs.starts_with("sig_")))
X_train = X_train.with_columns(scaler.transform(X_train.select(cs.starts_with("sig_"))))
X_test = X_test.with_columns(scaler.transform(X_test.select(cs.starts_with("sig_"))))Réduction de dimensionalité avec PCA¶
Les données sont de grandes dimensions (756 caractéristiques) par rapport à la taille du jeu de données (3555 individus).
On applique une réduction de dimensionnalité avec la PCA.
from sklearn.decomposition import PCA
pca = PCA(n_components=0.90, random_state=42).set_output(transform="polars")
# Réduction de dimensionnalité
pca.fit(X_train.select(cs.starts_with("sig_")))
pca_train = pca.transform(X_train.select(cs.starts_with("sig_")))
pca_test = pca.transform(X_test.select(cs.starts_with("sig_")))
X_train = X_train.drop(cs.starts_with("sig_")).with_columns(pca_train)
X_test = X_test.drop(cs.starts_with("sig_")).with_columns(pca_test)
f"Nombre de composantes : {pca.n_components_} / {len(pca.feature_names_in_)}"'Nombre de composantes : 341 / 756'Sauvegarde des données préparées¶
X_train.write_parquet("../../data/df_study_L18_w6_train.parquet")
X_test.write_parquet("../../data/df_study_L18_w6_test.parquet")Fonction utilitaire¶
Ci-dessous la fonction principale utilisé permettant de calculer les métriques de performances pour les modèles de survie.
from collections.abc import Iterable
from typing import Any
import numpy as np
import pandas as pd
import polars as pl
import polars.selectors as cs
from sksurv.metrics import (
brier_score,
concordance_index_censored,
cumulative_dynamic_auc,
integrated_brier_score,
)
from sksurv.util import Surv
def get_survival_at_t(surv_fns: pd.DataFrame, t: float) -> np.ndarray:
"""Standardizes survival probability extraction across libraries."""
if isinstance(surv_fns, pd.DataFrame): # lifelines
return np.array(
[np.interp(t, surv_fns.index, surv_fns[col]) for col in surv_fns.columns]
)
# sksurv (StepFunction objects)
return np.array([fn(t) for fn in surv_fns])
def evaluate_survival_model(
y_train: pl.DataFrame,
y_test: pl.DataFrame,
risk_scores: np.ndarray,
surv_fns: pd.DataFrame | Iterable[Any] | None = None,
) -> pl.DataFrame:
"""Calculer les métriques d'évaluation pour un modèle de survie."""
# Convertir les DataFrames polars en structures sksurv
y_train = Surv.from_dataframe("event", "time", y_train.to_pandas())
y_test = Surv.from_dataframe("event", "time", y_test.to_pandas())
metrics = dict()
# 1. C-Index
metrics["C-index"] = concordance_index_censored(
y_test["event"], y_test["time"], risk_scores
)[0]
# 2. td-AUC
safe_limit = y_train["time"].max() * 0.95
times_auc = np.quantile(y_test["time"][y_test["event"] == 1], [0.25, 0.5, 0.75])
times_auc = times_auc[times_auc < safe_limit]
mask = y_test["time"] < safe_limit
_, mean_auc = cumulative_dynamic_auc(
y_train, y_test[mask], risk_scores[mask], times_auc
)
metrics["Mean td-AUC"] = mean_auc
# Integrated Brier Score and Brier Score at t_median
if surv_fns:
# 1. Define evaluation times
test_times = np.percentile(y_test["time"], np.linspace(10, 90, 15))
if isinstance(surv_fns, pd.DataFrame):
preds_at_times = np.array(
[
np.interp(test_times, surv_fns.index, surv_fns[col])
for col in surv_fns.columns
]
)
else:
# Standard sksurv StepFunction handling
preds_at_times = np.array([f(test_times) for f in surv_fns])
# 3. Integrated Brier Score
metrics["IBS"] = integrated_brier_score(
y_train, y_test, preds_at_times, test_times
)
# 4. Specific Brier at Median
t_med = np.median(y_train["time"][y_train["event"]])
if isinstance(surv_fns, pd.DataFrame):
s_at_t_med = np.array(
[
np.interp(t_med, surv_fns.index, surv_fns[col])
for col in surv_fns.columns
]
)
else:
s_at_t_med = np.array([fn(t_med) for fn in surv_fns])
_, brier_med = brier_score(y_train, y_test, s_at_t_med, t_med)
metrics[f"Brier (t={t_med})"] = brier_med[0]
return pl.DataFrame(metrics).with_columns(cs.numeric().round(2))