Skip to content
Snippets Groups Projects
preprocessing_tools.py 11.99 KiB
import pandas as pd
import numpy as np

from sklearn.linear_model import LinearRegression
# from sklearn.preprocessing import scale
from sklearn.preprocessing import QuantileTransformer, PowerTransformer


### NEW ###
def _extract_cols(data, cols):
    if isinstance(data,pd.DataFrame):
        if not cols :
            cols = list(data.columns)
        data = data.to_numpy()

    elif isinstance(data,pd.Series) :
        if not cols :
            if data.name :
                cols = data.name
            else :
                cols = 0
        data = data.to_numpy().reshape(-1,1) # we want a vertical verctor to match the shape when a matrix is given

    else :
        if not cols :
            cols = list(range(data.shape[1]))
    return data, cols



def scale(a):
    return np.apply_along_axis(lambda x : (x-np.nanmean(x))/np.nanstd(x), axis = 0, arr = a)




def selecting_cols(df, threshold, verbose = 1):
    # Selecting columns with at least a percent of occurrences in cohort
    cols_filter = np.append(np.array([True]),((df.iloc[:,1:]!=0).sum(axis = 0)>threshold*df.shape[0]).values)

    if verbose > 0 :
        print("Removing %d columns, %d columns left" %((cols_filter==False).sum(), (cols_filter==True).sum()))
    df = df.loc[:,cols_filter]
    # cols_otu = df.columns#[1:]
    # cols_otu = cols_otu[cols_otu!='SAMPLE_ID']
    cols_otu = get_cols(df)
    return df, cols_otu


def filtering_ecrf(df, nan_tresh = 0.5, unbalance_tresh = 0.05, verbose = 10):
    # Keep if lessthan nan_tresh% NaNs
    df=df.loc[:,~(df.isna().sum()/df.shape[0]>nan_tresh)]

    #check occurrences in bin cases, we want more than unbalance_tresh
    for el in df.columns:
        if df[el].value_counts().shape[0]<=2: #we look for binary variables or variables with only one value to filter them
            nb = df[el].value_counts()[0]/df.shape[0]
            if nb<unbalance_tresh or nb>1-unbalance_tresh:
                df.drop(columns = el, inplace = True)
                if verbose > 0 : 
                    print("Deleted : ", el)
    return df, get_cols(df)



def get_cols(df):
    cols = list(df.columns)

    # removing SUBJID and SAMPLE_ID
    try: 
        cols.remove('SUBJID')
    except : 
        pass

    try :
        cols.remove('SAMPLE_ID')
    except :
        pass
    return cols


def adjust_covariates(data_, covariates_):
    """ 
    Input :
    data : serie
        serie of data to be adjusted. Meant to be used in .apply function over several columns
        
    Output : 
    serie : the adjusted serie
    """

    # removing variable if in covariates
    # if data.name in covariates.columns:
    #     covariates.drop(columns=data.name, inplace = True)
    data = data_.copy()
    covariates = covariates_.copy()

    nan_mask = ~np.isnan(data) #[True]* data.shape[0]#
    cov = scale(covariates[nan_mask])
    dat = scale(data[nan_mask])

    lr = LinearRegression()
    lr.fit(cov, dat)
    coefs = lr.coef_

    data[nan_mask] = dat - np.dot(coefs,cov.T)
    return data


def quantile_transformation(df, cols, output_distribution = 'normal'):
    qt = QuantileTransformer(n_quantiles = df.shape[0], output_distribution = output_distribution)
    df[cols] = qt.fit_transform(df[cols])
    return df



def pipeline(data, L_pipe, cols = None):
    for preproc in L_pipe :
        data = wrap_preproc(data, preproc)
    return data


def wrap_preproc(data, preproc, cols = None):
    # if preproc == 'None':
    #     pass
    if preproc == 'QuantileTransformer':
        print("***** Computing QuantileTransformer *****")
        return get_qt(data)
    if preproc == 'PowerTransformer':
        print("***** Computing PowerTransformer *****")
        return get_pt(data)
    if preproc == 'proportion':
        print("***** Computing proportion *****")
        return get_proportion(data)
    if preproc == 'fisher':
        print("***** Computing Fisher *****")
        return apply_fisher(data)
    if preproc == 'log':
        print("***** Computing log *****")
        return apply_log(data)
    if preproc == 'log10':
        print("***** Computing log *****")
        return apply_log10(data)
    if preproc == 'scale':
        print("***** Computing scale *****")
        return apply_scale(data)
    if preproc == 'center':
        print("***** Computing center *****")
        return apply_center(data)
    if preproc == 'positify':
        print("***** Computing positify *****")
        return positify(data)
    if 'filter' in preproc:
        print("***** Computing filtering of low frequencies *****")
        return filter_freq(data, preproc)


def apply_log(data_, cols = None):
    data = data_.copy()
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
        data = data.apply(lambda x: np.log(x+0.000000001))
    elif isinstance(data, np.ndarray):
        data = np.apply_along_axis(lambda x : np.log(x+0.000000001), axis = 1, arr = data)
    return data 

def apply_log10(data_, cols = None):
    data = data_.copy()
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
        data = data.apply(lambda x: np.log10(x+0.000000001))
    elif isinstance(data, np.ndarray):
        data = np.apply_along_axis(lambda x : np.log10(x+0.000000001), axis = 1, arr = data)
    return data

def apply_scale(data_, cols = None):
    data = data_.copy()
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
        # print(data.columns.shape)
        # print(scale(data).shape)
        # print(data)
        # print(data[data.columns])
        # print(set(data[data.columns]).symmetric_difference(set(scale(data).columns)))
        data[data.columns] = scale(data[data.columns])
    elif isinstance(data, np.ndarray):
        data = scale(data)
    return data 

def apply_center(data_, cols = None):
    data = data_.copy()
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
        data = scale(data, with_std = False)
    elif isinstance(data, np.ndarray):
        data = scale(data, with_std = False)
    return data


# def apply_clr(data, cols):
#   data[cols] = data[cols].apply(lambda x : np.log(x/(np.power(np.prod(x),1/x.shape[0])+0.00001)))
#   return data

def apply_fisher(data_, cols = None):
    data = data_.copy()
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
        data = data.apply(lambda x: 0.5*np.log((1+x)/(1-x)), axis = 1)
    elif isinstance(data, np.ndarray):
        data = np.apply_along_axis(lambda x: 0.5*np.log((1+x)/(1-x)), axis = 1, arr = data)
    return data
        
def get_qt(data_, cols = None):
    data = data_.copy()
    qt = QuantileTransformer(n_quantiles = data.shape[0], output_distribution = 'normal')
    # data[data.columns] = qt.fit_transform(data)
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
        data[data.columns] = qt.fit_transform(data)
    elif isinstance(data, np.ndarray):
        data = qt.fit_transform(data)
    return data


def get_pt(data_, cols = None):
    data = data_.copy()
    pt = PowerTransformer()
    data = pt.fit_transform(data)
    return data

def get_proportion(data_, cols = None):
    data = data_.copy()
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
        data = data.div(data.sum(axis = 1),axis = 0)
    elif isinstance(data, np.ndarray):
        data = np.divide(data, data.sum(axis=1).reshape(-1,1))
    return data

def positify(data_, cols = None):
    data = data_.copy()
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
        data = data.apply(lambda x:x+np.abs(x.min()), axis = 1)
    elif isinstance(data, np.ndarray):
        data = np.apply_along_axis(lambda x: x+np.abs(x.min()), axis = 1, arr = data)
    return data

def filter_freq(data_, filter_name, cols = None):
    data = data_.copy()
    freq_min = filter_name.split('_')[1]
    freq_min = float('0.'+freq_min)
    if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series):
        data = data[data>freq_min]
        data = data.fillna(0.)
    elif isinstance(data, np.ndarray):
        data = np.where(data < 3, 0, data)
    return data



"""
def pipeline(data, L_pipe, cols = None):
    print(L_pipe)
    for preproc in L_pipe :
        data = wrap_preproc(data, cols, preproc)
    return data


def wrap_preproc(data, preproc, cols = None):
    # if preproc == 'None':
    #     pass
    # print(preproc)
    if preproc == 'QuantileTransformer':
        print("***** Computing QuantileTransformer *****")
        return get_qt(data,cols)
    if preproc == 'PowerTransformer':
        print("***** Computing PowerTransformer *****")
        return get_pt(data, cols)
    if preproc == 'proportion':
        print("***** Computing proportion *****")
        return get_proportion(data, cols)
    if preproc == 'fisher':
        print("***** Computing Fisher *****")
        return apply_fisher(data, cols)
    if preproc == 'log':
        print("***** Computing log *****")
        return apply_log(data, cols)
    if preproc == 'log10':
        print("***** Computing log *****")
        return apply_log10(data, cols)
    if preproc == 'scale':
        print("***** Computing scale *****")
        return apply_scale(data, cols)
    if preproc == 'center':
        print("***** Computing center *****")
        return apply_center(data, cols)
    if 'filter' in preproc:
        print("***** Computing filtering of low frequencies *****")
        return filter_freq(data, cols, preproc)


def apply_log(data_, cols):
    data = data_.copy()
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
    data[cols] = data[cols].apply(lambda x: np.log(x+0.000000001))
    return data 

def apply_log10(data_, cols):
    data = data_.copy()
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
    data[cols] = data[cols].apply(lambda x: np.log10(x+0.000000001))
    return data

def apply_scale(data_, cols):
    data = data_.copy()
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
    data[cols] = scale(data[cols])
    return data 

def apply_center(data_, cols):
    data = data_.copy()
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
    data[cols] = scale(data[cols], with_std = False)
    return data


# def apply_clr(data, cols):
#   data[cols] = data[cols].apply(lambda x : np.log(x/(np.power(np.prod(x),1/x.shape[0])+0.00001)))
#   return data

def apply_fisher(data_, cols):
    data = data_.copy()
#         self.df_otu_data[self.cols_otu] = self.df_otu_data[self.cols_otu].apply(lambda x: np.log(x+0.000000001))
    data[cols] = data[cols].apply(lambda x: 0.5*np.log((1+x)/(1-x)), axis = 1)
    return data
        
def get_qt(data_, cols):
    data = data_.copy()
    qt = QuantileTransformer(output_distribution = 'normal')
    data[cols] = qt.fit_transform(data[cols])
    return data


def get_pt(data_, cols):
    data = data_.copy()
    pt = PowerTransformer()
    data[cols] = pt.fit_transform(data[cols])
    return data

def get_proportion(data_, cols):
    data = data_.copy()
    data[cols] = data[cols].div(data[cols].sum(axis = 1),axis = 0)
    return data

def filter_freq(data_, cols, filter_name):
    data = data_.copy()
    freq_min = filter_name.split('_')[1]
    freq_min = float('0.'+freq_min)
    data[cols] = data[cols][data[cols]>freq_min]
    data[cols] = data[cols].fillna(0.)
    return data

"""