-
Christophe BOETTO authoredChristophe BOETTO authored
preprocessing_tools.py 11.99 KiB
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
# from sklearn.preprocessing import scale
from sklearn.preprocessing import QuantileTransformer, PowerTransformer
### NEW ###
def _extract_cols(data, cols):
if isinstance(data,pd.DataFrame):
if not cols :
cols = list(data.columns)
data = data.to_numpy()
elif isinstance(data,pd.Series) :
if not cols :
if data.name :
cols = data.name
else :
cols = 0
data = data.to_numpy().reshape(-1,1) # we want a vertical verctor to match the shape when a matrix is given
else :
if not cols :
cols = list(range(data.shape[1]))
return data, cols
def scale(a):
return np.apply_along_axis(lambda x : (x-np.nanmean(x))/np.nanstd(x), axis = 0, arr = a)
def selecting_cols(df, threshold, verbose = 1):
# Selecting columns with at least a percent of occurrences in cohort
cols_filter = np.append(np.array([True]),((df.iloc[:,1:]!=0).sum(axis = 0)>threshold*df.shape[0]).values)
if verbose > 0 :
print("Removing %d columns, %d columns left" %((cols_filter==False).sum(), (cols_filter==True).sum()))
df = df.loc[:,cols_filter]
# cols_otu = df.columns#[1:]
# cols_otu = cols_otu[cols_otu!='SAMPLE_ID']
cols_otu = get_cols(df)
return df, cols_otu
def filtering_ecrf(df, nan_tresh = 0.5, unbalance_tresh = 0.05, verbose = 10):
# Keep if lessthan nan_tresh% NaNs
df=df.loc[:,~(df.isna().sum()/df.shape[0]>nan_tresh)]
#check occurrences in bin cases, we want more than unbalance_tresh
for el in df.columns:
if df[el].value_counts().shape[0]<=2: #we look for binary variables or variables with only one value to filter them
nb = df[el].value_counts()[0]/df.shape[0]
if nb<unbalance_tresh or nb>1-unbalance_tresh:
df.drop(columns = el, inplace = True)
if verbose > 0 :
print("Deleted : ", el)
return df, get_cols(df)
def get_cols(df):
cols = list(df.columns)
# removing SUBJID and SAMPLE_ID
try:
cols.remove('SUBJID')
except :
pass
try :
cols.remove('SAMPLE_ID')
except :
pass
return cols
def adjust_covariates(data_, covariates_):
"""
Input :
data : serie
serie of data to be adjusted. Meant to be used in .apply function over several columns
Output :
serie : the adjusted serie
"""
# removing variable if in covariates
# if data.name in covariates.columns:
# covariates.drop(columns=data.name, inplace = True)
data = data_.copy()
covariates = covariates_.copy()
nan_mask = ~np.isnan(data) #[True]* data.shape[0]#
cov = scale(covariates[nan_mask])
dat = scale(data[nan_mask])
lr = LinearRegression()
lr.fit(cov, dat)
coefs = lr.coef_
data[nan_mask] = dat - np.dot(coefs,cov.T)
return data
def quantile_transformation(df, cols, output_distribution = 'normal'):
qt = QuantileTransformer(n_quantiles = df.shape[0], output_distribution = output_distribution)
df[cols] = qt.fit_transform(df[cols])
return df
def pipeline(data, L_pipe, cols = None):
for preproc in L_pipe :
data = wrap_preproc(data, preproc)
return data
def wrap_preproc(data, preproc, cols = None):
# if preproc == 'None':
# pass
if preproc == 'QuantileTransformer':
print("***** Computing QuantileTransformer *****")
return get_qt(data)
if preproc == 'PowerTransformer':
print("***** Computing PowerTransformer *****")
return get_pt(data)
if preproc == 'proportion':
print("***** Computing proportion *****")
return get_proportion(data)
if preproc == 'fisher':
print("***** Computing Fisher *****")
return apply_fisher(data)
if preproc == 'log':
print("***** Computing log *****")
return apply_log(data)
if preproc == 'log10':
print("***** Computing log *****")
return apply_log10(data)
if preproc == 'scale':
print("***** Computing scale *****")
return apply_scale(data)
if preproc == 'center':
print("***** Computing center *****")
return apply_center(data)
if preproc == 'positify':
print("***** Computing positify *****")
return positify(data)
if 'filter' in preproc:
print("***** Computing filtering of low frequencies *****")
return filter_freq(data, preproc)
def apply_log(data_, cols = None):
data = data_.copy()
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data = data.apply(lambda x: np.log(x+0.000000001))
elif isinstance(data, np.ndarray):
data = np.apply_along_axis(lambda x : np.log(x+0.000000001), axis = 1, arr = data)
return data
def apply_log10(data_, cols = None):
data = data_.copy()
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data = data.apply(lambda x: np.log10(x+0.000000001))
elif isinstance(data, np.ndarray):
data = np.apply_along_axis(lambda x : np.log10(x+0.000000001), axis = 1, arr = data)
return data
def apply_scale(data_, cols = None):
data = data_.copy()
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
# print(data.columns.shape)
# print(scale(data).shape)
# print(data)
# print(data[data.columns])
# print(set(data[data.columns]).symmetric_difference(set(scale(data).columns)))
data[data.columns] = scale(data[data.columns])
elif isinstance(data, np.ndarray):
data = scale(data)
return data
def apply_center(data_, cols = None):
data = data_.copy()
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data = scale(data, with_std = False)
elif isinstance(data, np.ndarray):
data = scale(data, with_std = False)
return data
# def apply_clr(data, cols):
# data[cols] = data[cols].apply(lambda x : np.log(x/(np.power(np.prod(x),1/x.shape[0])+0.00001)))
# return data
def apply_fisher(data_, cols = None):
data = data_.copy()
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data = data.apply(lambda x: 0.5*np.log((1+x)/(1-x)), axis = 1)
elif isinstance(data, np.ndarray):
data = np.apply_along_axis(lambda x: 0.5*np.log((1+x)/(1-x)), axis = 1, arr = data)
return data
def get_qt(data_, cols = None):
data = data_.copy()
qt = QuantileTransformer(n_quantiles = data.shape[0], output_distribution = 'normal')
# data[data.columns] = qt.fit_transform(data)
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
data[data.columns] = qt.fit_transform(data)
elif isinstance(data, np.ndarray):
data = qt.fit_transform(data)
return data
def get_pt(data_, cols = None):
data = data_.copy()
pt = PowerTransformer()
data = pt.fit_transform(data)
return data
def get_proportion(data_, cols = None):
data = data_.copy()
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
data = data.div(data.sum(axis = 1),axis = 0)
elif isinstance(data, np.ndarray):
data = np.divide(data, data.sum(axis=1).reshape(-1,1))
return data
def positify(data_, cols = None):
data = data_.copy()
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data = data.apply(lambda x:x+np.abs(x.min()), axis = 1)
elif isinstance(data, np.ndarray):
data = np.apply_along_axis(lambda x: x+np.abs(x.min()), axis = 1, arr = data)
return data
def filter_freq(data_, filter_name, cols = None):
data = data_.copy()
freq_min = filter_name.split('_')[1]
freq_min = float('0.'+freq_min)
if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
data = data[data>freq_min]
data = data.fillna(0.)
elif isinstance(data, np.ndarray):
data = np.where(data < 3, 0, data)
return data
"""
def pipeline(data, L_pipe, cols = None):
print(L_pipe)
for preproc in L_pipe :
data = wrap_preproc(data, cols, preproc)
return data
def wrap_preproc(data, preproc, cols = None):
# if preproc == 'None':
# pass
# print(preproc)
if preproc == 'QuantileTransformer':
print("***** Computing QuantileTransformer *****")
return get_qt(data,cols)
if preproc == 'PowerTransformer':
print("***** Computing PowerTransformer *****")
return get_pt(data, cols)
if preproc == 'proportion':
print("***** Computing proportion *****")
return get_proportion(data, cols)
if preproc == 'fisher':
print("***** Computing Fisher *****")
return apply_fisher(data, cols)
if preproc == 'log':
print("***** Computing log *****")
return apply_log(data, cols)
if preproc == 'log10':
print("***** Computing log *****")
return apply_log10(data, cols)
if preproc == 'scale':
print("***** Computing scale *****")
return apply_scale(data, cols)
if preproc == 'center':
print("***** Computing center *****")
return apply_center(data, cols)
if 'filter' in preproc:
print("***** Computing filtering of low frequencies *****")
return filter_freq(data, cols, preproc)
def apply_log(data_, cols):
data = data_.copy()
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data[cols] = data[cols].apply(lambda x: np.log(x+0.000000001))
return data
def apply_log10(data_, cols):
data = data_.copy()
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data[cols] = data[cols].apply(lambda x: np.log10(x+0.000000001))
return data
def apply_scale(data_, cols):
data = data_.copy()
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data[cols] = scale(data[cols])
return data
def apply_center(data_, cols):
data = data_.copy()
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data[cols] = scale(data[cols], with_std = False)
return data
# def apply_clr(data, cols):
# data[cols] = data[cols].apply(lambda x : np.log(x/(np.power(np.prod(x),1/x.shape[0])+0.00001)))
# return data
def apply_fisher(data_, cols):
data = data_.copy()
# self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
data[cols] = data[cols].apply(lambda x: 0.5*np.log((1+x)/(1-x)), axis = 1)
return data
def get_qt(data_, cols):
data = data_.copy()
qt = QuantileTransformer(output_distribution = 'normal')
data[cols] = qt.fit_transform(data[cols])
return data
def get_pt(data_, cols):
data = data_.copy()
pt = PowerTransformer()
data[cols] = pt.fit_transform(data[cols])
return data
def get_proportion(data_, cols):
data = data_.copy()
data[cols] = data[cols].div(data[cols].sum(axis = 1),axis = 0)
return data
def filter_freq(data_, cols, filter_name):
data = data_.copy()
freq_min = filter_name.split('_')[1]
freq_min = float('0.'+freq_min)
data[cols] = data[cols][data[cols]>freq_min]
data[cols] = data[cols].fillna(0.)
return data
"""