Skip to content
Snippets Groups Projects
Commit 54f2b5bb authored by Blaise Li's avatar Blaise Li
Browse files

Added function to merge count tables in a dict.

parent bfee7df6
Branches
Tags
No related merge requests found
...@@ -9,4 +9,6 @@ except PackageNotFoundError: ...@@ -9,4 +9,6 @@ except PackageNotFoundError:
__copyright__ = "Copyright (C) 2023 Blaise Li" __copyright__ = "Copyright (C) 2023 Blaise Li"
__licence__ = "GNU GPLv3" __licence__ = "GNU GPLv3"
from .libsnp import (load_counts_from_joint_vcf,) from .libsnp import (
concat_counts_from_dicts,
load_counts_from_joint_vcf,)
...@@ -12,9 +12,7 @@ ...@@ -12,9 +12,7 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """Miscellaneous utilities to deal with SNP data."""
Miscellaneous utilities to deal with SNP data.
"""
import pandas as pd import pandas as pd
from pysam import VariantFile from pysam import VariantFile
...@@ -22,6 +20,8 @@ from pysam import VariantFile ...@@ -22,6 +20,8 @@ from pysam import VariantFile
def load_counts_from_joint_vcf(vcf_path, chroms, extra_headers=None): def load_counts_from_joint_vcf(vcf_path, chroms, extra_headers=None):
""" """
Load read counts from a multi-sample .vcf file.
*vcf_path* should be the path to a .vcf(.gz) *vcf_path* should be the path to a .vcf(.gz)
file containing joint SNP calling data. file containing joint SNP calling data.
*chroms* should be the list of the chromosome names, in the desired order. *chroms* should be the list of the chromosome names, in the desired order.
...@@ -69,3 +69,46 @@ def load_counts_from_joint_vcf(vcf_path, chroms, extra_headers=None): ...@@ -69,3 +69,46 @@ def load_counts_from_joint_vcf(vcf_path, chroms, extra_headers=None):
counts[sample].index.levels[1].astype(chrom_dtype), counts[sample].index.levels[1].astype(chrom_dtype),
level=1)).sort_index(level=["chrom", "pos"]) level=1)).sort_index(level=["chrom", "pos"])
return counts return counts
def concat_counts_from_dicts(counts_dict, extra_header_names=None):
"""
Concatenate counts tables stored in a per-sample dictionary.
*counts_dicts* should be a dictionary where the keys are sample
names and the values are pandas DataFrames where lines correspond
to loci and columns correspond to "RO" and "AO" read counts supporting
the reference and alternative allele at the corresponding locus for the
considered sample.
*extra_header_names* should be column level names to associate
to the extra headers passed to the *extra_headers* option
of *load_counts_from_joint_vcf*.
"""
if extra_header_names is None:
extra_header_names = tuple()
# Since SNPs were called on all samples jointly,
# missing data could be considered as absence of mapped reads.
# counts = pd.concat(counts_dict, axis=1).fillna(0)
# However, such loci may rather be considered unreliable:
# There should be reads for at least one of the alleles
counts = pd.concat(counts_dict, axis=1)
print(f"{counts.shape=}")
counts.columns = counts.columns.set_names(
["sample", *extra_header_names, "count"])
has_na_in_a_sample = counts.isna().any(axis=1)
print(f"{has_na_in_a_sample.sum()=}")
notenough = "(not enough reads in at least one sample to be reliable)"
print(f"Dropping SNPs with NA {notenough}")
counts = counts.dropna()
print(f"{counts.shape=}")
zero_counts_in_a_sample = (counts.groupby(
level=[
i for i in range(len(counts.columns.names))
if i != counts.columns.names.index("count")],
axis=1).sum() == 0.0).any(axis=1)
print(f"{zero_counts_in_a_sample.sum()=}")
print(f"Dropping SNPs where a sample has zero counts {notenough}")
# remove those lines where a sample has no counts
counts = counts.loc[~zero_counts_in_a_sample]
print(f"{counts.shape=}")
return counts
...@@ -22,11 +22,11 @@ import sys ...@@ -22,11 +22,11 @@ import sys
import unittest import unittest
test_dir = Path(__file__).parent test_dir = Path(__file__).parent
lib_dir = test_dir.parent lib_dir = test_dir.parent.joinpath("src")
sys.path.insert(0, lib_dir) sys.path.insert(0, str(lib_dir))
import pandas as pd import pandas as pd
from libsnp import load_counts_from_joint_vcf from libsnp import load_counts_from_joint_vcf, concat_counts_from_dicts
class TestLoader(unittest.TestCase): class TestLoader(unittest.TestCase):
...@@ -50,6 +50,20 @@ class TestLoader(unittest.TestCase): ...@@ -50,6 +50,20 @@ class TestLoader(unittest.TestCase):
(_, ncols) = all_counts.shape (_, ncols) = all_counts.shape
self.assertEqual(ncols, 2 * len(self.samples)) self.assertEqual(ncols, 2 * len(self.samples))
class TestConcatenator(unittest.TestCase):
"""Tests for the counts loader."""
def setUp(self):
self.chroms = [str(i) for i in range(1, 37)] + ["maxi"]
self.test_data = test_dir.joinpath("data", "jointcall_sample.vcf.gz")
self.counts_dict = load_counts_from_joint_vcf(
self.test_data,
chroms=self.chroms)
self.samples = list(self.counts_dict.keys())
def test_concat_data(self):
"""Some iintegrity checks on loaded table."""
counts = concat_counts_from_dicts(self.counts_dict)
self.assertEqual(counts.shape[1], len(self.samples) * 2)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment