Select Git revision
hmmerAnnotationDecode.py
manocca.py 13.56 KiB
import numpy as np
import pandas as pd
# from sklearn.preprocessing import scale
# import tools.preprocessing_tools.scale as scale
from sklearn.decomposition import PCA
from tqdm import tqdm
from joblib import Parallel, delayed, cpu_count
from tools.preprocessing_tools import scale
from tools import preprocessing_tools as pt
from tools.compute_manova import custom_manova, custom_mancova
import warnings
warnings.filterwarnings('ignore')
class MANOCCA:
""" Class performing a covariance test between a set of outputs, predictors and covariates.
For a sample of N, given a set of k outputs Y , n_p predictors X, and n_c covariates C the class performs the test : Y ~ X + C
Parameters
----------
predictors : numpy array or pandas dataframe
Matrix of N x n_p predictors for the test, X in the formula. Each column should be a distinct predictor, and the tests are performed independently for each predictors.
outputs : numpy array or pandas dataframe
Matrix of N x k outputs from which to test the covariance, Y in the formula. Rows should be the samples and columns should be the taxa/outcome for the sample. The class will then compute the covariance and all the steps to prepare the test.
covariates : numpy array or pandas dataframe, default = None
Matrix of N x n_c covariates to include in the linear model, C in the formula. The final test is performed on the residuals derived using the covariates inputed here.
cols_ouputs : list, default = None
List of names for the outputs. If a dataframe is given previously, the column names are extracted. Otherwise if None and outputs is a np.array, the columns are set by default to integers from 1 to outputs.shape[1]
cols_predictors : list, default = None
List of names for the predictors. If a dataframe is given previously, the column names are extracted. Otherwise if None and predictors is a np.array, the columns are set by default to integers from 1 to predictors.shape[1]
cols_covariates : list, default = None
List of names for the covariates. If a dataframe is given previously, the column names are extracted. Otherwise if None and covariates is a np.array, the columns are set by default to integers from 1 to covariates.shape[1]
prodV_red : np.array, default = None
Pre-computed prodV_red from a previous run. To save some time, and since the covariance computation ahead of the test is time consuming, you can directly input the already computed prodV_red from a previous run.
You can retrieve the prodV_red from a previous run by doing : model.prodV_red, model being a previous computed (at least initialized) MANOCCA model.
n_comp : int, default = None
Number of principal components to keep in the model. It can later still be reduced in the .test() method, but it can never be increased above the value inputed here. If None, it will be by default set to min(N_samples, N_products)
prod_to_keep : list, default = None
List of products to keep in the test. This feature is still under development (23/05/2023) and needs to be manipulated with care. Products to keep should be listed as strings under the format : 'var1|var2', var1 and var2 being two output variables, and var1 being
a column indexed smaller than var2. In other words, if the column names for outputs are : [var1, .... , var2, ... ] it works, but not : [..., var2, ... , var1]
use_resid : bool, default = True
Whether or not to use residuals to adjust the covariates. If True the predictors are tested with regards to the PCs adjusted for the covariates, if False the covariates are included in the linear model PC_i ~ X+C. !!! To this day (23/05/2023) only use_resid=True works !!!
Not using the residuals currently displays miscalibration with an inflation of type 1 error rate.
n_jobs : int, default = 1
Number of jobs to use in parrallel. If set to -1, the max number of available cores is used.
Attributes
----------
All initialized parameters are set as attributes :
predictors : numpy array or pandas dataframe
outputs : numpy array or pandas dataframe
covariates : numpy array or pandas dataframe
cols_ouputs : list
cols_predictors : list
cols_covariates : list
prodV_red : np.array
n_comp : int
prod_to_keep : list
use_resid : bool
n_jobs : int
# Additional attributes #
prodV : numpy array, matrix of products
prodV_red : numpy array, matrix of principal components kept
pca : sklearn.decomposition.PCA, pca used to reduce the dimension
p : numpy array, array of p_values for the test
Examples :
>>> import numpy as np
>>> Y = np.random.normal(0,1,(100,10))
>>> X = np.random.binomial(1,0.4,(100,1))
>>> C = np.random.normal(0,1,(100,2))
>>> m = MANOCCA(X, Y, C, n_comp = 20, n_jobs = -1)
>>> m.test(n_comp = 10)
>>> m.p
array([[0.25268347]]) #depends of random state
"""
def __init__(self, predictors, outputs, covariates=None, cols_outputs = None,
cols_predictors = None, cols_covariates = None, prodV_red=None, n_comp = None, prod_to_keep = None, use_resid = True, use_pca = True, n_jobs = 1):
### Initializing
self.outputs = outputs
self.cols_outputs = cols_outputs
if not isinstance(outputs, type(None)) :
self.outputs, self.cols_outputs = pt._extract_cols(self.outputs, self.cols_outputs)
self.predictors = predictors
self.cols_predictors = cols_predictors
self.predictors, self.cols_predictors = pt._extract_cols(self.predictors, self.cols_predictors)
self.covariates = covariates
self.cols_covariates = cols_covariates
if covariates is not None :
self.covariates, self.cols_covariates = pt._extract_cols(self.covariates, self.cols_covariates)
# Parameters
if n_comp :
self.n_comp = n_comp
else :
self.n_comp = outputs.shape[1]
self.n_jobs = n_jobs
self.use_resid = use_resid
self.use_pca = use_pca
# Filled later
self.prodV = None
self.prodV_red = None
self.pca = None
self.p = None
# Filtering product columns
self.prod_to_keep = prod_to_keep
if not isinstance(self.prod_to_keep, type(None)):
self.prod_to_keep = np.array(self.prod_to_keep)
if not isinstance(self.prod_to_keep[0], type(list)) and not isinstance(self.prod_to_keep[0], np.ndarray) :
self.prod_to_keep = [self.prod_to_keep]
# Compute prodV_red if not given
if prodV_red is not None:
self.prodV_red = prodV_red
else : # If not we preprocess the data and compute prodV and prodV_red
self.prodV = self.get_prodV_para_wrap(self.outputs)
# self.prodV = scale(self.prodV)
if not isinstance(self.prod_to_keep, type(None)): # Filtering out some columns
self.filter_prodV_columns()
if self.use_pca == True :
self.prodV_red, self.pca = self.get_prodV_red(self.n_comp, return_pca = True) # prodV_red is scaled within the get_prodV_red function
self.prodV_red = scale(pt.get_qt(self.prodV_red))
else :
self.prodV_red = scale(self.prodV)
# self.prodV_red = scale(self.prodV_red) # NO SCALE IN GET_PRODV_RED FUNCTION
# self.prodV_red = pt.get_qt(self.prodV_red) # TO DELETE
if use_resid and isinstance(covariates, np.ndarray):
print("computing residuals")
self.prodV_red = np.apply_along_axis(lambda x : pt.adjust_covariates(x,covariates), axis = 0, arr = self.prodV_red)
### Functions to generate prodV and prodV_red ###
def get_prodV_para_effi(self, DD0, job_id, nb_compute):
""" Computes the product matrix for a set of outputs and a given number of jobs
Parameters
----------
DD0 : numpy array
Outputs from which the covariance (product matrix) should be computed. It should be self.outputs
job_id : int
job identifcation, equals to the step in the loop.
nb_compute : int,
Total number of cores allocated for the task. Equals to n_jobs in all cases, except when n_jobs=-1 where its value is cpu_count() from the joblib library.
Returns
-------
L_prodV : list of numpy arrays, list of all the chunks of product matrix computed by the different process.
"""
L_prodV = []
for i in range(job_id, DD0.shape[1]-1, nb_compute):
# tmp = np.transpose(np.transpose(DD0[:,(i+1):DD0.shape[1]])*DD0[:,i])
tmp = (DD0[:,(i+1):]*DD0[:,i].reshape(-1,1))
# std = tmp.std(axis = 0)
tmp = pt.get_qt(tmp)
# tmp = (std) * tmp
L_prodV += [tmp]
return L_prodV
def get_prodV_para_wrap(self, DD0):
""" Computes the product matrix for a set of outputs
This function wraps the parallel processing to compute the product matrix. It parameters the get_prodV_para_effi function depending on the parralel status.
After the parralel computation, the chunks are re-organised to match an order where the products are all computed from left to right. Ie, with the ouputs columns names :
[v1, v2, v3, v4], we want : [v1|v2, v1|v3, v1|v4, v2|v3, v2|v4, v3|v4] and no other order. This helps keep track of the products.
Parameters
----------
DD0 : numpy array
Outputs from which the covariance (product matrix) should be computed. It should be self.outputs
Returns
-------
prodV : numpy array, the matrix of N_samples x k*(k-1)/2 with k the number of outpts, to use in the MANOCCA test
"""
n_jobs = self.n_jobs
if n_jobs == 1 :
print("Sequential computation of prodV")
res = [self.get_prodV_para_effi(DD0, 0, 1)]
else :
print("Parallel computation of prodV")
nb_compute = cpu_count()
res = Parallel(n_jobs=n_jobs, verbose = 10)(delayed(self.get_prodV_para_effi)(DD0,j, nb_compute) for j in range(nb_compute))
# reordering
res_ordered = []
for _ in range(len(res[0])):
for L in res :
if L != [] :
res_ordered += [L.pop(0)]
return np.hstack(res_ordered)
def get_prodV_red(self, n_comp, return_pca = False, svd_solver = 'full'):
""" Computes the PCA reduced matrix given the product matrix
The product matrix should already be computed in self.prodV.
Parameters
----------
n_comp : int
Number of Principal Components (PC) to use for the test.
return_pca : bool, default = False
Flag to return the pca if needed. If yes, (prodV_red, pca) is returned, otherwise just prodV_red
svd_solver : string, default='full'
Solver to compute the PCA. See sklearn.decomposition.PCA documentation for more details.
Returns
-------
prodV_red : numpy array, the matrix of N_samples x n_comp to use in the MANOCCA test
"""
pca = PCA(n_components = n_comp, svd_solver = svd_solver)
tmp = pca.fit_transform(self.prodV)
if return_pca:
return tmp, pca # SCALE
else :
return tmp
def get_prod_cols(self, cols, sep = '|'):
return [cols[i]+sep+cols[j] for i in range(len(cols)-1) for j in range(i+1,len(cols))]
def get_prod_to_keep(self):
L_to_keep = [self.get_prod_cols(el) for el in self.prod_to_keep]
L_to_keep = [item for sublist in L_to_keep for item in sublist] #flatten list
return L_to_keep
def filter_prodV_columns(self):
cols_prod = self.get_prod_cols(self.cols_outputs)
if self.prod_to_keep[0][0] in self.cols_outputs: #if first element in columns then we need to compute the products
L_to_keep = self.get_prod_to_keep()
else: # else products are already given
L_to_keep = self.prod_to_keep[0]
L_index = [cols_prod.index(x) for x in L_to_keep]
self.prodV = self.prodV[:,L_index]
def test(self, var = None, n_comp = None, min_comp = 0):
if n_comp is None :
n_comp = self.prodV_red.shape[1]
if isinstance(var, type(None)) :
var = self.predictors
n_preds = var.shape[1]
p = np.empty((n_preds,1))
for i_pred in range(n_preds):
# print(self.cols_predictors[i_pred])
nan_mask = np.isnan(var[:,i_pred])
if ((self.covariates is not None) and (self.use_resid == False)) :
# print("With covariates")
p[i_pred] = custom_mancova(scale(self.prodV_red[~nan_mask,min_comp:n_comp]), scale(var[~nan_mask,i_pred].reshape(-1,1)), scale(self.covariates[~nan_mask,:]))
else :
# print("Without covariates")
p[i_pred] = custom_manova(self.prodV_red[~nan_mask,min_comp:n_comp], scale(var[~nan_mask,i_pred].reshape(-1,1)))
self.p = p
# ### TESTS ###
# a = pd.DataFrame(np.random.randint(0,10,(10,5)), columns = ["cols_" + str(i) for i in range(5)])
# b = pd.DataFrame(np.random.randint(0,10,(10,5)), columns = ["cols_pred_" + str(i) for i in range(5)])
# c = pd.DataFrame(np.random.randint(0,10,(10,3)), columns = ["cols_cov_" + str(i) for i in range(3)])
# b = b.iloc[:,0]
# print(a)
# manocca = MANOCCA(a,b, cols_predictors = ['a'], covariates = c)
# # print(scale(None))
# print(manocca.predictors)
# print(manocca.cols_predictors)
# manocca.test()
# print(manocca.p)