Skip to content
Snippets Groups Projects
Select Git revision
  • 58ea43bf9e84d7139c58615741426722736fc7fb
  • master default protected
  • v19.0.1
  • v19.0.0
  • v15.1.0
5 results

hmmerAnnotationDecode.py

Blame
  • manocca.py 13.56 KiB
    import numpy as np
    import pandas as pd
    
    # from sklearn.preprocessing import scale
    # import tools.preprocessing_tools.scale as scale
    from sklearn.decomposition import PCA
    
    from tqdm import tqdm
    
    from joblib import Parallel, delayed, cpu_count
    
    from tools.preprocessing_tools import scale
    from tools import preprocessing_tools as pt
    from tools.compute_manova import custom_manova, custom_mancova
    
    
    import warnings
    warnings.filterwarnings('ignore')
    
    
    class MANOCCA:
        """ Class performing a covariance test between a set of outputs, predictors and covariates.
    
        For a sample of N, given a set of k outputs Y , n_p predictors X, and n_c covariates C the class performs the test : Y ~ X + C
    
    
        Parameters
        ----------
        predictors : numpy array or pandas dataframe
            Matrix of N x n_p predictors for the test, X in the formula. Each column should be a distinct predictor, and the tests are performed independently for each predictors.
    
        outputs : numpy array or pandas dataframe
            Matrix of N x k outputs from which to test the covariance, Y in the formula. Rows should be the samples and columns should be the taxa/outcome for the sample. The class will then compute the covariance and all the steps to prepare the test.
    
        covariates : numpy array or pandas dataframe, default = None
            Matrix of N x n_c covariates to include in the linear model, C in the formula. The final test is performed on the residuals derived using the covariates inputed here.
    
        cols_ouputs : list, default = None
            List of names for the outputs. If a dataframe is given previously, the column names are extracted. Otherwise if None and outputs is a np.array, the columns are set by default to integers from 1 to outputs.shape[1]
    
        cols_predictors : list, default = None
            List of names for the predictors. If a dataframe is given previously, the column names are extracted. Otherwise if None and predictors is a np.array, the columns are set by default to integers from 1 to predictors.shape[1]
    
        cols_covariates : list, default = None
            List of names for the covariates. If a dataframe is given previously, the column names are extracted. Otherwise if None and covariates is a np.array, the columns are set by default to integers from 1 to covariates.shape[1]
    
        prodV_red : np.array, default = None
            Pre-computed prodV_red from a previous run. To save some time, and since the covariance computation ahead of the test is time consuming, you can directly input the already computed prodV_red from a previous run. 
            You can retrieve the prodV_red from a previous run by doing : model.prodV_red, model being a previous computed (at least initialized) MANOCCA model.
    
        n_comp : int, default = None
        Number of principal components to keep in the model. It can later still be reduced in the .test() method, but it can never be increased above the value inputed here. If None, it will be by default set to min(N_samples, N_products)
    
        prod_to_keep : list, default = None
        List of products to keep in the test. This feature is still under development (23/05/2023) and needs to be manipulated with care. Products to keep should be listed as strings under the format : 'var1|var2', var1 and var2 being two output variables, and var1 being 
        a column indexed smaller than var2. In other words, if the column names for outputs are : [var1, .... , var2, ... ] it works, but not : [..., var2, ... , var1]
    
        use_resid : bool, default = True
        Whether or not to use residuals to adjust the covariates. If True the predictors are tested with regards to the PCs adjusted for the covariates, if False the covariates are included in the linear model PC_i ~ X+C.  !!! To this day (23/05/2023) only use_resid=True works !!! 
        Not using the residuals currently displays miscalibration with an inflation of type 1 error rate.
    
        n_jobs : int, default = 1
        Number of jobs to use in parrallel. If set to -1, the max number of available cores is used. 
    
    
        Attributes
        ----------
    
        All initialized parameters are set as attributes :
        predictors : numpy array or pandas dataframe
        outputs : numpy array or pandas dataframe
        covariates : numpy array or pandas dataframe
        cols_ouputs : list
        cols_predictors : list
        cols_covariates : list
        prodV_red : np.array
        n_comp : int
        prod_to_keep : list
        use_resid : bool
        n_jobs : int
    
        # Additional attributes #
        prodV : numpy array, matrix of products
        prodV_red : numpy array, matrix of principal components kept
        pca : sklearn.decomposition.PCA, pca used to reduce the dimension
        p : numpy array, array of p_values for the test
    
    
        Examples :
        >>> import numpy as np
        >>> Y = np.random.normal(0,1,(100,10))
        >>> X = np.random.binomial(1,0.4,(100,1))
        >>> C = np.random.normal(0,1,(100,2))
        >>> m = MANOCCA(X, Y, C, n_comp = 20, n_jobs = -1)
        >>> m.test(n_comp = 10)
        >>> m.p
        array([[0.25268347]]) #depends of random state
    
    
        """
        def __init__(self, predictors, outputs, covariates=None, cols_outputs = None,
                     cols_predictors = None, cols_covariates = None, prodV_red=None, n_comp = None, prod_to_keep = None, use_resid = True, use_pca = True, n_jobs = 1):
    
            ### Initializing 
            self.outputs = outputs
            self.cols_outputs = cols_outputs
            if not isinstance(outputs, type(None)) :
                self.outputs, self.cols_outputs = pt._extract_cols(self.outputs, self.cols_outputs)
    
            self.predictors = predictors
            self.cols_predictors = cols_predictors
            self.predictors, self.cols_predictors = pt._extract_cols(self.predictors, self.cols_predictors)
    
            self.covariates = covariates
            self.cols_covariates = cols_covariates
            if covariates is not None :
                self.covariates, self.cols_covariates = pt._extract_cols(self.covariates, self.cols_covariates)  
    
            # Parameters
            if n_comp :
                self.n_comp = n_comp
            else : 
                self.n_comp = outputs.shape[1]
            self.n_jobs = n_jobs
            self.use_resid = use_resid
            self.use_pca = use_pca
    
            # Filled later
            self.prodV = None
            self.prodV_red = None
            self.pca = None
            self.p = None
    
            # Filtering product columns
            self.prod_to_keep = prod_to_keep
            if not isinstance(self.prod_to_keep, type(None)):
                self.prod_to_keep = np.array(self.prod_to_keep)
                if not isinstance(self.prod_to_keep[0], type(list)) and not isinstance(self.prod_to_keep[0], np.ndarray) :
                    self.prod_to_keep = [self.prod_to_keep]
    
            # Compute prodV_red if not given
            if prodV_red is not None:
                self.prodV_red = prodV_red
    
            else : # If not we preprocess the data and compute prodV and prodV_red
                self.prodV = self.get_prodV_para_wrap(self.outputs)
                # self.prodV = scale(self.prodV)
                if not isinstance(self.prod_to_keep, type(None)): # Filtering out some columns
                    self.filter_prodV_columns()
    
                if self.use_pca == True :
                    self.prodV_red, self.pca = self.get_prodV_red(self.n_comp, return_pca = True) # prodV_red is scaled within the get_prodV_red function
                    self.prodV_red = scale(pt.get_qt(self.prodV_red))
                else :
                    self.prodV_red = scale(self.prodV)
                # self.prodV_red = scale(self.prodV_red) # NO SCALE IN GET_PRODV_RED FUNCTION
                # self.prodV_red = pt.get_qt(self.prodV_red) # TO DELETE
    
            if use_resid and isinstance(covariates, np.ndarray):
                print("computing residuals")
                self.prodV_red = np.apply_along_axis(lambda x : pt.adjust_covariates(x,covariates), axis = 0, arr = self.prodV_red)
            
        ### Functions to generate prodV and prodV_red ###
    
        def get_prodV_para_effi(self, DD0, job_id, nb_compute):
            """  Computes the product matrix for a set of outputs and a given number of jobs
    
    
            Parameters
            ----------
            DD0 : numpy array
                Outputs from which the covariance (product matrix) should be computed. It should be self.outputs
    
            job_id : int
                job identifcation, equals to the step in the loop.
    
            nb_compute : int,
                Total number of cores allocated for the task. Equals to n_jobs in all cases, except when n_jobs=-1 where its value is cpu_count() from the joblib library.
    
            Returns
            -------
            L_prodV : list of numpy arrays, list of all the chunks of product matrix computed by the different process. 
            """
    
            L_prodV = []
            for i in range(job_id, DD0.shape[1]-1, nb_compute):
                # tmp = np.transpose(np.transpose(DD0[:,(i+1):DD0.shape[1]])*DD0[:,i])
                tmp = (DD0[:,(i+1):]*DD0[:,i].reshape(-1,1))
                # std = tmp.std(axis = 0)
                tmp = pt.get_qt(tmp)
                # tmp = (std) * tmp
                L_prodV += [tmp]
            return L_prodV
    
    
        def get_prodV_para_wrap(self, DD0):
            """  Computes the product matrix for a set of outputs
            
            This function wraps the parallel processing to compute the product matrix. It parameters the get_prodV_para_effi function depending on the parralel status.
            After the parralel computation, the chunks are re-organised to match an order where the products are all computed from left to right. Ie, with the ouputs columns names : 
            [v1, v2, v3, v4], we want : [v1|v2, v1|v3, v1|v4, v2|v3, v2|v4, v3|v4] and no other order. This helps keep track of the products. 
    
            Parameters
            ----------
            DD0 : numpy array
                Outputs from which the covariance (product matrix) should be computed. It should be self.outputs
    
    
            Returns
            -------
            prodV : numpy array, the matrix of N_samples x k*(k-1)/2 with k the number of outpts, to use in the MANOCCA test
    
            """
            n_jobs = self.n_jobs
            if n_jobs == 1 :
                print("Sequential computation of prodV")
                res = [self.get_prodV_para_effi(DD0, 0, 1)]
            else :
                print("Parallel computation of prodV")
                nb_compute = cpu_count()
                res = Parallel(n_jobs=n_jobs, verbose = 10)(delayed(self.get_prodV_para_effi)(DD0,j, nb_compute) for j in range(nb_compute))
            # reordering
            res_ordered = []
            for _ in range(len(res[0])):
                for L in res :
                    if L != [] : 
                        res_ordered += [L.pop(0)]
            return np.hstack(res_ordered)
    
        def get_prodV_red(self, n_comp, return_pca = False, svd_solver = 'full'):
            """  Computes the PCA reduced matrix given the product matrix
                
            The product matrix should already be computed in self.prodV.
    
            Parameters
            ----------
            n_comp : int
                Number of Principal Components (PC) to use for the test.
    
            return_pca : bool, default = False
                Flag to return the pca if needed. If yes, (prodV_red, pca) is returned, otherwise just prodV_red
    
            svd_solver : string, default='full'
                Solver to compute the PCA. See sklearn.decomposition.PCA documentation for more details.
    
    
            Returns
            -------
            prodV_red : numpy array, the matrix of N_samples x n_comp to use in the MANOCCA test
    
            """
            pca = PCA(n_components = n_comp, svd_solver = svd_solver)
            tmp = pca.fit_transform(self.prodV) 
            if return_pca:
                return tmp, pca  # SCALE
            else :
                return tmp
    
        def get_prod_cols(self, cols, sep = '|'):
            return [cols[i]+sep+cols[j] for i in range(len(cols)-1) for j in range(i+1,len(cols))]
    
        def get_prod_to_keep(self):
            L_to_keep = [self.get_prod_cols(el) for el in self.prod_to_keep]
            L_to_keep = [item for sublist in L_to_keep for item in sublist] #flatten list
            return L_to_keep
    
    
        def filter_prodV_columns(self):
            cols_prod = self.get_prod_cols(self.cols_outputs)
    
            if self.prod_to_keep[0][0] in self.cols_outputs: #if first element in columns then we need to compute the products
                L_to_keep = self.get_prod_to_keep()
            else: # else products are already given
                L_to_keep = self.prod_to_keep[0]
    
            L_index = [cols_prod.index(x) for x in L_to_keep]
            
            self.prodV = self.prodV[:,L_index]
    
            
    
    
    
        def test(self, var = None, n_comp = None, min_comp = 0):
            if n_comp is None :
                n_comp = self.prodV_red.shape[1]
            if isinstance(var, type(None)) :
                var = self.predictors
    
            n_preds = var.shape[1]
            p = np.empty((n_preds,1))
            for i_pred in range(n_preds):
                # print(self.cols_predictors[i_pred])
                nan_mask = np.isnan(var[:,i_pred])
                if ((self.covariates is not None) and (self.use_resid == False)) :
                    # print("With covariates")
                    p[i_pred] = custom_mancova(scale(self.prodV_red[~nan_mask,min_comp:n_comp]), scale(var[~nan_mask,i_pred].reshape(-1,1)), scale(self.covariates[~nan_mask,:]))
                else :
                    # print("Without covariates")
                    p[i_pred] = custom_manova(self.prodV_red[~nan_mask,min_comp:n_comp], scale(var[~nan_mask,i_pred].reshape(-1,1)))
            self.p = p
    
    
    
    
    # ### TESTS ###
    
    # a = pd.DataFrame(np.random.randint(0,10,(10,5)), columns = ["cols_" + str(i) for i in range(5)])
    # b = pd.DataFrame(np.random.randint(0,10,(10,5)), columns = ["cols_pred_" + str(i) for i in range(5)])
    # c = pd.DataFrame(np.random.randint(0,10,(10,3)), columns = ["cols_cov_" + str(i) for i in range(3)])
    # b = b.iloc[:,0]
    # print(a)
    
    # manocca = MANOCCA(a,b, cols_predictors = ['a'], covariates = c)
    
    # # print(scale(None))
    
    # print(manocca.predictors)
    # print(manocca.cols_predictors)
    # manocca.test()
    # print(manocca.p)