windows.py 2.65 KB
Newer Older
1
2
3
4
"""
implement the imputation window is sliding along the genome:

- ImpG like: Non overlapping windows, the imputation is apply in batch to unknown snp in the window
5
- centered_window:  A sliding window centered on the Snp to impute
6
7

"""
8
9
from .stat_models import ImpG_model_batch, ImpG_model_snp
from .ld_matrix import generate_sparse_matrix
10

11
import pandas as pd
12
13
14
15
16
17
18
19
20
21
22

def parse_region_position(LD_file):
    """
    Retrieve the region definition from a ld-file generated by impute_jass
    Argument :
        LD_file : A LD file generated by jass_impute

    """
    (chrom, startpos, endpos ) = LD_file.split("/")[-1].split(".")[0].split('_')
    return (chrom, startpos, endpos)

23
24

def realigned_zfiles_on_panel(ref_panel, Zscores):
25
    """
26
27
28
29
30
31
32
33
34
35
36
    Check if the counted allele is the same in the reference panel and
    the Zscore files.

    If not, the coded and other allele are inverted and the Zscores sign
    is inverted also.
    """
    allele_inverted = (ref_panel.loc[Zscores.index, 'Ref_all'] != Zscores.A0)

    Zscores.loc[allele_inverted, "A0"] = ref_panel.alt_all
    Zscores.loc[allele_inverted, "A1"] = ref_panel.Ref_all
    Zscores.loc[allele_inverted, "Z"] = - Zscores.loc[allele_inverted, "Z"]
37

38
39
40
41
42
43
    return Zscores

def centered_window_imputation(LD_file, ref_panel_folder, Zfile, window_size):
    """
        Each missing Snp is imputed by known snp found in a window centered on the SNP to impute
        Argument
44
    """
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    (chrom, startpos, endpos) = parse_region_position(LD_file)

    ref_panel_file = "/mnt/atlas/PCMA/1._DATA/ImpG_refpanel/{0}.eur.1pct.bim".format(chrom)
    print(ref_panel_file)
    ref_panel = pd.read_csv(ref_panel_file, sep="\t", names=['chr', "nothing", 'pos', 'Ref_all', 'alt_all'], index_col = 1)

    LD_mat = generate_sparse_matrix(LD_file, ref_panel)

    Zscores = pd.read_csv(Zfile, index_col=0, sep="\t")

    Zscores = realigned_zfiles_on_panel(ref_panel, Zscores)
    Zscores['Var'] = 1

    # dispatch snp between typed and untyped
    unknowns = LD_mat.index.difference(Df.index)

    print("### Imputation of {0} snps ###".format(len(unknowns)))


    for snp_unknown in unknowns:
        # Boundary of the centered_window
        start_ld_block = ref_panel.loc[snp_unknown,'pos'] - window
        end_ld_block = ref_panel.loc[snp_unknown,'pos'] + window

        known = Zscores.loc[(start_ld_block < Df.pos) & (Df.pos < end_ld_block)].index

        Sig_t = LD_mat.loc[known, known]
        Sig_i_t = LD_mat.loc[snp_unknown, known]
        Zt = Zscores.loc[known,'Z']

        imp = ImpG_model_snp(Zt, Sig_t, Sig_i_t)
        Zt.loc[snp_unknown, "Z"] = imp['mu']
        Zt.loc[snp_unknown, "Var"] = imp['Var']

    return Zt.sort_values(by="pos")