Source code for tempor.datasources.mivdp.utils.icu_preprocess_util

"""ICU preprocessing utilities.

Based on:
https://github.com/healthylaife/MIMIC-IV-Data-Pipeline
``utils/icu_preprocess_util.py``
"""

import ast
import os
from typing import Optional

import numpy as np
import pandas as pd
from sklearn.preprocessing import MultiLabelBinarizer
from tqdm import tqdm

from .common import pd_v2_compat_append

# --- GENERAL ---


[docs]def dataframe_from_csv( path, compression="gzip", header=0, index_col=0, chunksize=None, # pylint: disable=unused-argument ): return pd.read_csv( # type: ignore path, compression=compression, # type: ignore header=header, index_col=index_col, chunksize=None, )
[docs]def read_admissions_table(mimic4_path): admits = dataframe_from_csv(os.path.join(mimic4_path, "core/admissions.csv.gz")) admits = admits.reset_index() admits = admits[["subject_id", "hadm_id", "admittime", "dischtime", "deathtime", "ethnicity"]] admits.admittime = pd.to_datetime(admits.admittime) admits.dischtime = pd.to_datetime(admits.dischtime) admits.deathtime = pd.to_datetime(admits.deathtime) return admits
[docs]def read_patients_table(mimic4_path): pats = dataframe_from_csv(os.path.join(mimic4_path, "core/patients.csv.gz")) pats = pats.reset_index() pats = pats[ [ "subject_id", "gender", "dod", "anchor_age", "anchor_year", "anchor_year_group", ] ] pats["yob"] = pats["anchor_year"] - pats["anchor_age"] # pats.dob = pd.to_datetime(pats.dob) pats.dod = pd.to_datetime(pats.dod) return pats
# --- DIAGNOSES ---
[docs]def read_diagnoses_icd_table(mimic4_path): diag = dataframe_from_csv(os.path.join(mimic4_path, "hosp/diagnoses_icd.csv.gz")) diag.reset_index(inplace=True) return diag
[docs]def read_d_icd_diagnoses_table(mimic4_path): d_icd = dataframe_from_csv(os.path.join(mimic4_path, "hosp/d_icd_diagnoses.csv.gz")) d_icd.reset_index(inplace=True) return d_icd[["icd_code", "long_title"]]
[docs]def read_diagnoses(mimic4_path): return read_diagnoses_icd_table(mimic4_path).merge( read_d_icd_diagnoses_table(mimic4_path), how="inner", left_on=["icd_code"], right_on=["icd_code"], )
[docs]def standardize_icd(mapping, df, root=False): """Takes an ICD9 -> ICD10 mapping table and a diagnosis dataframe; adds column with converted ICD10 column""" def icd_9to10(icd): # If root is true, only map an ICD 9 -> 10 according to the ICD9's root (first 3 digits) if root: icd = icd[:3] try: # Many ICD-9's do not have a 1-to-1 mapping; get first index of mapped codes return mapping.loc[mapping.diagnosis_code == icd].icd10cm.iloc[0] except Exception: # pylint: disable=broad-except print("Error on code", icd) return np.nan # Create new column with original codes as default col_name = "icd10_convert" if root: col_name = "root_" + col_name df[col_name] = df["icd_code"].values # Group identical ICD9 codes, then convert all ICD9 codes within a group to ICD10 for code, group in df.loc[df.icd_version == 9].groupby(by="icd_code"): new_code = icd_9to10(code) for idx in group.index.values: # Modify values of original df at the indexes in the groups df.at[idx, col_name] = new_code
# --- PROCEDURES ---
[docs]def read_procedures_icd_table(mimic4_path): proc = dataframe_from_csv(os.path.join(mimic4_path, "hosp/procedures_icd.csv.gz")) proc.reset_index(inplace=True) return proc
[docs]def read_d_icd_procedures_table(mimic4_path): p_icd = dataframe_from_csv(os.path.join(mimic4_path, "hosp/d_icd_procedures.csv.gz")) p_icd.reset_index(inplace=True) return p_icd[["icd_code", "long_title"]]
[docs]def read_procedures(mimic4_path): return read_procedures_icd_table(mimic4_path).merge( read_d_icd_procedures_table(mimic4_path), how="inner", left_on=["icd_code"], right_on=["icd_code"], )
# --- MAPPING ---
[docs]def read_icd_mapping(map_path): mapping = pd.read_csv(map_path, header=0, delimiter="\t") mapping.diagnosis_description = mapping.diagnosis_description.apply(str.lower) return mapping
# --- PREPROCESSING ---
[docs]def preproc_meds(module_path: str, adm_cohort_path: str) -> pd.DataFrame: adm = pd.read_csv( adm_cohort_path, usecols=["hadm_id", "stay_id", "intime"], parse_dates=["intime"], ) med = pd.read_csv( module_path, compression="gzip", usecols=[ "subject_id", "stay_id", "itemid", "starttime", "endtime", "rate", "amount", "orderid", ], parse_dates=["starttime", "endtime"], ) med = med.merge(adm, left_on="stay_id", right_on="stay_id", how="inner") med["start_hours_from_admit"] = med["starttime"] - med["intime"] med["stop_hours_from_admit"] = med["endtime"] - med["intime"] # print(med.isna().sum()) med = med.dropna() # med[['amount','rate']]=med[['amount','rate']].fillna(0) print("# of unique type of drug: ", med.itemid.nunique()) print("# Admissions: ", med.stay_id.nunique()) print("# Total rows", med.shape[0]) return med
[docs]def preproc_proc( dataset_path: str, cohort_path: str, time_col: str, dtypes: Optional[dict], usecols: list ) -> pd.DataFrame: """Function for getting hosp observations pertaining to a pickled cohort. Function is structured to save memory when reading and transforming data.""" def merge_module_cohort() -> pd.DataFrame: """Gets the initial module data with patients anchor year data and only the year of the charttime""" # read module w/ custom params module = pd.read_csv( dataset_path, compression="gzip", usecols=usecols, dtype=dtypes, parse_dates=[time_col], ).drop_duplicates() # print(module.head()) # Only consider values in our cohort cohort = pd.read_csv(cohort_path, compression="gzip", parse_dates=["intime"]) # print(module.head()) # print(cohort.head()) # merge module and cohort return module.merge( cohort[["subject_id", "hadm_id", "stay_id", "intime", "outtime"]], how="inner", left_on="stay_id", right_on="stay_id", ) df_cohort = merge_module_cohort() df_cohort["event_time_from_admit"] = df_cohort[time_col] - df_cohort["intime"] df_cohort = df_cohort.dropna() # Print unique counts and value_counts print("# Unique Events: ", df_cohort.itemid.dropna().nunique()) print("# Admissions: ", df_cohort.stay_id.nunique()) print("Total rows", df_cohort.shape[0]) # Only return module measurements within the observation range, sorted by subject_id return df_cohort
[docs]def preproc_out( dataset_path: str, cohort_path: str, time_col: str, dtypes: Optional[dict], usecols: Optional[list] ) -> pd.DataFrame: """Function for getting hosp observations pertaining to a pickled cohort. Function is structured to save memory when reading and transforming data. """ def merge_module_cohort() -> pd.DataFrame: """Gets the initial module data with patients anchor year data and only the year of the charttime""" # read module w/ custom params module = pd.read_csv( dataset_path, compression="gzip", usecols=usecols, dtype=dtypes, parse_dates=[time_col], ).drop_duplicates() # print(module.head()) # Only consider values in our cohort cohort = pd.read_csv(cohort_path, compression="gzip", parse_dates=["intime"]) # print(module.head()) # print(cohort.head()) # merge module and cohort return module.merge( cohort[["stay_id", "intime", "outtime"]], how="inner", left_on="stay_id", right_on="stay_id", ) df_cohort = merge_module_cohort() df_cohort["event_time_from_admit"] = df_cohort[time_col] - df_cohort["intime"] df_cohort = df_cohort.dropna() # Print unique counts and value_counts print("# Unique Events: ", df_cohort.itemid.nunique()) print("# Admissions: ", df_cohort.stay_id.nunique()) print("Total rows", df_cohort.shape[0]) # Only return module measurements within the observation range, sorted by subject_id return df_cohort
[docs]def preproc_chart( dataset_path: str, cohort_path: str, time_col: str, dtypes: Optional[dict], usecols: list ) -> pd.DataFrame: """Function for getting hosp observations pertaining to a pickled cohort. Function is structured to save memory when reading and transforming data.""" # Only consider values in our cohort cohort = pd.read_csv(cohort_path, compression="gzip", parse_dates=["intime"]) df_cohort = pd.DataFrame() # read module w/ custom params chunksize = 10000000 count = 0 # nitem = [] # nstay = [] # nrows = 0 for chunk in tqdm( pd.read_csv( dataset_path, compression="gzip", usecols=usecols, dtype=dtypes, parse_dates=[time_col], chunksize=chunksize, ) ): # print(chunk.head()) count = count + 1 # chunk['valuenum']=chunk['valuenum'].fillna(0) chunk = chunk.dropna(subset=["valuenum"]) chunk_merged = chunk.merge( cohort[["stay_id", "intime"]], how="inner", left_on="stay_id", right_on="stay_id", ) chunk_merged["event_time_from_admit"] = chunk_merged[time_col] - chunk_merged["intime"] del chunk_merged[time_col] del chunk_merged["intime"] chunk_merged = chunk_merged.dropna() chunk_merged = chunk_merged.drop_duplicates() if df_cohort.empty: df_cohort = chunk_merged else: df_cohort = pd_v2_compat_append(df_cohort, chunk_merged, ignore_index=True) # nitem.append(chunk_merged.itemid.dropna().unique()) # nstay=nstay.append(chunk_merged.stay_id.unique()) # nrows=nrows+chunk_merged.shape[0] # Print unique counts and value_counts # print("# Unique Events: ", len(set(nitem))) # print("# Admissions: ", len(set(nstay))) # print("Total rows", nrows) print("# Unique Events: ", df_cohort.itemid.nunique()) print("# Admissions: ", df_cohort.stay_id.nunique()) print("Total rows", df_cohort.shape[0]) # Only return module measurements within the observation range, sorted by subject_id return df_cohort
[docs]def preproc_icd_module( module_path: str, adm_cohort_path: str, icd_map_path=None, map_code_colname=None, only_icd10=True, ) -> pd.DataFrame: """Takes an module dataset with ICD codes and puts it in long_format, optionally mapping ICD-codes by a mapping table path """ def get_module_cohort( module_path: str, cohort_path: str, # pylint: disable=unused-argument ): module = pd.read_csv(module_path, compression="gzip", header=0) adm_cohort = pd.read_csv(adm_cohort_path, compression="gzip", header=0) # print(module.head()) # print(adm_cohort.head()) # adm_cohort = adm_cohort.loc[(adm_cohort.timedelta_years <= 6) & (~adm_cohort.timedelta_years.isna())] return module.merge( adm_cohort[["hadm_id", "stay_id", "label"]], how="inner", left_on="hadm_id", right_on="hadm_id", ) def _standardize_icd(mapping, df, root=False): """Takes an ICD9 -> ICD10 mapping table and a diagnosis dataframe; adds column with converted ICD10 column""" def icd_9to10(icd): # If root is true, only map an ICD 9 -> 10 according to the ICD9's root (first 3 digits) if root: icd = icd[:3] try: # Many ICD-9's do not have a 1-to-1 mapping; get first index of mapped codes return mapping.loc[mapping[map_code_colname] == icd].icd10cm.iloc[0] except Exception: # pylint: disable=broad-except # print("Error on code", icd) return np.nan # Create new column with original codes as default col_name = "icd10_convert" if root: col_name = "root_" + col_name df[col_name] = df["icd_code"].values # Group identical ICD9 codes, then convert all ICD9 codes within a group to ICD10 for code, group in df.loc[df.icd_version == 9].groupby(by="icd_code"): new_code = icd_9to10(code) for idx in group.index.values: # Modify values of original df at the indexes in the groups df.at[idx, col_name] = new_code if only_icd10: # Column for just the roots of the converted ICD10 column df["root"] = df[col_name].apply(lambda x: x[:3] if type(x) is str else np.nan) module = get_module_cohort(module_path, adm_cohort_path) # print(module.shape) # print(module['icd_code'].nunique()) # Optional ICD mapping if argument passed if icd_map_path: icd_map = read_icd_mapping(icd_map_path) # print(icd_map) _standardize_icd(icd_map, module, root=True) print( "# Unique ICD-9 codes", module[module["icd_version"] == 9]["icd_code"].nunique(), ) print( "# Unique ICD-10 codes", module[module["icd_version"] == 10]["icd_code"].nunique(), ) print( "# Unique ICD-10 codes (After converting ICD-9 to ICD-10)", module["root_icd10_convert"].nunique(), ) print( "# Unique ICD-10 codes (After clinical grouping ICD-10 codes)", module["root"].nunique(), ) print("# Admissions: ", module.stay_id.nunique()) print("Total rows", module.shape[0]) return module
[docs]def pivot_cohort( df: pd.DataFrame, prefix: str, target_col: str, values="values", use_mlb=False, ohe=True, max_features=None, ): """Pivots long_format data into a multiindex array: || feature 1 || ... || feature n || || subject_id || label || timedelta || """ aggfunc = np.mean pivot_df = df.dropna(subset=[target_col]) if use_mlb: mlb = MultiLabelBinarizer() output = mlb.fit_transform(pivot_df[target_col].apply(ast.literal_eval)) output = pd.DataFrame(output, columns=mlb.classes_) if max_features: top_features = output.sum().sort_values(ascending=False).index[:max_features] output = output[top_features] pivot_df = pd.concat( [ pivot_df[["subject_id", "label", "timedelta"]].reset_index(drop=True), output, ], axis=1, ) pivot_df = pd.pivot_table( # type: ignore pivot_df, index=["subject_id", "label", "timedelta"], values=pivot_df.columns[3:], # type: ignore aggfunc=np.max, ) else: if max_features: top_features = pd.Series( pivot_df[["subject_id", target_col]].drop_duplicates()[target_col].value_counts().index[:max_features], name=target_col, ) pivot_df = pivot_df.merge(top_features, how="inner", left_on=target_col, right_on=target_col) if ohe: pivot_df = pd.concat( [ pivot_df.reset_index(drop=True), pd.Series(np.ones(pivot_df.shape[0], dtype=int), name="values"), ], axis=1, ) aggfunc = np.max # type: ignore pivot_df = pivot_df.pivot_table( index=["subject_id", "label", "timedelta"], columns=target_col, values=values, aggfunc=aggfunc, ) pivot_df.columns = [prefix + str(i) for i in pivot_df.columns] return pivot_df