Skip to content
Snippets Groups Projects
Select Git revision
  • e113126c5db2c8f063330f566dd154335d31da52
  • master default protected
  • dev_nanopore
  • 2.0_beta
  • V_3.0
  • v_2.1
  • V_2.0
  • V_1.9.6
  • v_1.9.6
  • v_1.9.5
  • v_1.9.4
  • V_1.9.3
  • V_1.9.2
  • V_1.9.1
  • V_1.9
  • V_1.8
  • V_1.6
  • 1.5.1
  • V_1.5
  • V1.4
  • V1.3
21 results

unit_test_main_utils.cpp

Blame
  • libworkflows.py 11.43 KiB
    import os
    import sys
    from glob import glob
    from shutil import rmtree
    import warnings
    from contextlib import contextmanager
    from subprocess import Popen, PIPE
    import pandas as pd
    import matplotlib.pyplot as plt
    from re import compile
    # To parse genome size information
    from bs4 import BeautifulSoup
    
    OPJ = os.path.join
    
    
    def formatwarning(message, category, filename, lineno, line=None):
        """Used to format warning messages."""
        return "%s:%s: %s: %s\n" % (filename, lineno, category.__name__, message)
    
    
    warnings.formatwarning = formatwarning
    
    
    # To be loaded in snakemake as follows:
    # from libworkflows import SHELL_FUNCTIONS
    # shell.prefix(SHELL_FUNCTIONS)
    SHELL_FUNCTIONS = """
    # http://linuxcommand.org/wss0150.php
    PROGNAME=$(basename $0)
    
    function error_exit
    {{
    #	----------------------------------------------------------------
    #	Function for exit due to fatal program error
    #		Accepts 1 argument:
    #			string containing descriptive error message
    #	----------------------------------------------------------------
        echo "${{PROGNAME}}: ${{1:-"Unknown Error"}}" 1>&2
        exit 1
    }}
    
    count_fastq_reads()
    {{
        # $1: file in which to write the number of fastq records
        wc -l | {{ read nblines; echo ${{nblines}} / 4 | bc > ${{1}}; }}
    }}
    
    sort_by_seq()
    {{
        fastq-sort -s || error_exit "fastq-sort failed"
    }}
    
    dedup()
    {{
        sort_by_seq | remove-duplicates-from-sorted-fastq || error_exit "remove_duplicates_from_sorted_fastq failed"
    }}
    
    trim_random_nt()
    {{
        # $1: nb of bases to trim at 5' end
        # $2: nb of bases to trim at 3' end
        cutadapt -u ${{1}} -u -${{2}} - 2> /dev/null || error_exit "trim_random_nt failed"
    }}
    
    compute_size_distribution()
    {{
        # $1: file in which to write the size distribution
        awk 'NR%4==2 {{histo[length($0)]++}} END {{for (l in histo) print l"\\t"histo[l]}}' > ${{1}}
    }}
    
    samstats2mapped()
    {{
        # $1: file containing the output of samtools stats
        # $2: file in which to put the number of mappers
        grep "^SN" ${{1}} | cut -f 2- | grep "^reads mapped:" | cut -f 2 > ${{2}}
    }}
    """
    
    strip = str.strip
    split = str.split
    
    
    def strip_split(text):
        return split(strip(text), "\t")
    
    
    def ensure_relative(path, basedir):
        """Returns the relative path to *path* from *basedir*.
        That way, a snakefile can be included using its absolute path."""
        if os.path.isabs(path):
            return os.path.relpath(path, basedir)
        else:
            return path
    
    def cleanup_and_backup(config):
        """Performs cleanup and backup according to the information present in the
        *config* dictionary."""
        print("removing metadata")
        # https://stackoverflow.com/a/45614282/1878788
        rmtree(".snakemake/metadata")
        if config.get("backup", {}):
            user = config["backup"]["user"]
            if user == "USER":
                user = os.environ[user]
            host = config["backup"]["host"]
            # If no dest_dir, we assume the directory structure
            # is the same on the destination host
            dest_dir = config["backup"].get("dest_dir", os.getcwd())
            print(f"backuping results to {user}@{host}:{dest_dir}")
            shell(f"rsync -vaP {output_dir} {user}@{host}:{dest_dir}")
    
    
    def read_int_from_file(filename):
        """Just reads a single integer from a file."""
        with open(filename, "r") as f:
            return int(f.readline().strip())
    
    
    @contextmanager
    def warn_context(warn_filename):
        """Yields a function that writes warnings in file *warn_filename*
        (and also to stderr)."""
        with open(warn_filename, "w") as warn_file:
            def warn_to_file(message, category, filename, lineno, file=None, line=None):
                formatted = warnings.formatwarning(message, category, filename, lineno)
                # echo to stderr
                print(formatted, file=sys.stderr)
                warn_file.write(formatted)
            warnings.showwarning = warn_to_file
            yield warnings.warn
    
    
    def get_chrom_sizes(filename):
        """Parses an illumina iGenome GenomeSize.xml file.
        Returns a dictionary with chromosome names as keys
        and chromosome lengths as values."""
        with open(filename, "r") as genome_size_file:
            return dict([
                (chrom["contigname"], int(chrom["totalbases"])) for chrom in BeautifulSoup(
                    genome_size_file, "html5lib").sequencesizes.findAll("chromosome")])
    
    
    # NOTE: It is not clear that this really does what I think it does:
    # https://www.biostars.org/p/161534/
    # https://www.biostars.org/p/170406/
    def feature_orientation2stranded(LIB_TYPE):
        def feature_stranded(wildcards):
            orientation = wildcards.orientation
            if orientation == "fwd":
                if LIB_TYPE[-2:] == "SF":
                    return 1
                elif LIB_TYPE[-2:] == "SR":
                    return 2
                else:
                    raise ValueError(f"{LIB_TYPE} library type not compatible with strand-aware read counting.")
            elif orientation == "rev":
                if LIB_TYPE[-2:] == "SF":
                    return 2
                elif LIB_TYPE[-2:] == "SR":
                    return 1
                else:
                    raise ValueError(f"{LIB_TYPE} library type not compatible with strand-aware read counting.")
            elif orientation == "all":
                return 0
            else:
                exit("Orientation is to be among \"fwd\", \"rev\" and \"all\".")
        return feature_stranded
    
    
    def sum_htseq_counts(counts_filename):
        with open(counts_filename) as counts_file:
            return sum((int(fields[1]) for fields in map(
                strip_split, counts_file) if not fields[0].startswith("__")))
    
    
    def read_htseq_counts(counts_filename):
        return pd.read_table(counts_filename, header=None, index_col=0).drop(
            ["__no_feature",
             "__ambiguous",
             "__too_low_aQual",
             "__not_aligned",
             "__alignment_not_unique"],
            errors="ignore")
    
    
    def sum_feature_counts(counts_filename, nb_bams=1):
        """Sums all counts in a featureCounts generated *counts_filename*.
        *nb_bams* indicates the numbre of bam files that were given to featureCounts.
        This determines which columns should be used."""
        # Counts are in the 7-th column, starting from third row.
        # The first sum is over the the rows, the second over the columns
        return pd.read_table(counts_filename, skiprows=2, usecols=range(6, 6 + nb_bams), header=None).sum().sum()
    
    
    def read_feature_counts(counts_filename, nb_bams=1):
        return pd.read_table(counts_filename, skiprows=1, usecols=[0, *range(6, 6 + nb_bams)], index_col=0)
    
    
    # I	3746	3909	"WBGene00023193"	-	.	17996
    # I	4118	10230	"WBGene00022277"	-	.	1848
    # I	10412	16842	"WBGene00022276"	+	.	4814
    # I	17482	26781	"WBGene00022278"	-	.	4282
    # I	22881	23600	"WBGene00235381"	-	.	421
    # I	27594	32482	"WBGene00022279"	-	.	2237
    # I	31522	31543	"WBGene00170953"	+	.	110
    # I	32414	32435	"WBGene00173569"	+	.	87
    # I	43732	44677	"WBGene00022275"	+	.	24
    # I	47471	49819	"WBGene00044345"	+	.	846
    def sum_intersect_counts(counts_filename):
        """Sums all counts in a bedtools intersect generated *counts_filename*, where the annotation was in bed format."""
        # Counts are in the 7-th column
        try:
            return pd.read_table(counts_filename, usecols=[6], header=None).sum().iloc[0]
        except pd.errors.EmptyDataError:
            return "NA"
    
    def read_intersect_counts(counts_filename):
        # index_col takes effect after column selection with usecols, hence index_col=0 (ex-third column):
        # https://stackoverflow.com/a/45943627/1878788
        try:
            return pd.read_table(counts_filename, usecols=[3,6], header=None, index_col=0)
        except pd.errors.EmptyDataError:
            return pd.DataFrame(index = [], columns = ["gene", "counts"]).set_index("gene")
    
    
    # http://stackoverflow.com/a/845069/1878788
    def file_len(fname):
        p = Popen(
            ['wc', '-l', fname],
            stdout=PIPE,
            stderr=PIPE)
        result, err = p.communicate()
        if p.returncode != 0:
            raise IOError(err)
        return int(result.strip().split()[0])
    
    
    def last_lines(fname, nb_lines=1):
        p = Popen(
            ["tail", f"-{nb_lines}", fname],
            stdout=PIPE,
            stderr=PIPE)
        result, err = p.communicate()
        if p.returncode != 0:
            raise IOError(err)
        return result.decode("utf-8")
    
    
    def test_na_file(fname):
        with open(fname, "r") as f:
            firstline = f.readline().strip()
        return firstline == "NA"
    
    
    def filter_combinator(combinator, blacklist):
        """This function builds a wildcards combination generator
        based on the generator *combinator* and a set of combinations
        to exclude *blacklist*."""
        def filtered_combinator(*args, **kwargs):
            """This function generates wildcards combinations.
            It is to be used as second argument of *expand*."""
            #print(*args)
            for wc_comb in combinator(*args, **kwargs):
                # Use frozenset instead of tuple
                # in order to accomodate
                # unpredictable wildcard order
                if frozenset(wc_comb) not in blacklist:
                    yield wc_comb
        return filtered_combinator
    
    
    def column_converter(convert_dict, column_name=None):
        """This generates a function that can be used in *pandas.DataFrame.apply*.
        *convert_dict* will be used to generate new names from those found in the index,
        or in the column given by *column_name*."""
        get = convert_dict.get
        if column_name is None:
            def convert_name(row):
                old_name = row.name
                return get(old_name, old_name)
        else:
            def convert_name(row):
                old_name = row[column_name]
                return get(old_name, old_name)
        return convert_name
    
    
    def save_plot(outfile,
                  plot_func,
                  *args,
                  title=None, format=None,
                  tight=True, equal_axes=False,
                  **kwargs):
        """*format* is needed when using multiple pages output."""
        # https://stackoverflow.com/a/10154763/1878788
        extra_artists = plot_func(*args, **kwargs)
        if extra_artists is not None:
            save_kwds = {"bbox_extra_artists": extra_artists}
        else:
            save_kwds = dict()
        if title is not None:
            plt.gcf().suptitle(title)
        if equal_axes:
            plt.axis("equal")
        if tight:
            save_kwds["bbox_inches"] = "tight"
            #plt.tight_layout()
        if format is None:
            plt.savefig(outfile, **save_kwds)
        else:
            plt.savefig(outfile, format=format, **save_kwds)
    
    
    def make_id_list_getter(gene_lists_dir, avail_id_lists=None):
        if avail_id_lists is None:
            avail_id_lists = set(glob(OPJ(gene_lists_dir, "*_ids.txt")))
        str_attr_err = compile("'str' object has no attribute '.*'")
        def get_id_list(wildcards):
            """Instead of a "wildcards" object, a string can be used directly."""
            try:
                id_list = wildcards.id_list
            except AttributeError as e:
                # This may not be a "wildcards" object;
                # Try to use it as a string
                if str_attr_err.match(str(e)) is not None:
                    id_list = wildcards
                else:
                    raise
            # This may be directly a path
            if id_list in avail_id_lists:
                with open(id_list, "r") as infile:
                    return [strip_split(line)[0] for line in infile.readlines() if line[0] != "#"]
            elif id_list == "lfc_statuses":
                return None
            else:
                list_filename = OPJ(gene_lists_dir, f"{id_list}_ids.txt")
                if list_filename in avail_id_lists:
                    with open(list_filename, "r") as infile:
                        return [strip_split(line)[0] for line in infile.readlines() if line[0] != "#"]
                else:
                    raise NotImplementedError(f"{id_list} unknown.\n")
        return get_id_list