Skip to content
Snippets Groups Projects
Select Git revision
  • 184ed946311346051070be32390a55a79a2a9efa
  • master default protected
  • vero_dev
  • ptv_packaging
  • process_gzip_input
  • py3_release_1
  • paired_read_fix
  • first_read_bugfix
  • v4.3
  • v4.2
  • v4.1
  • py3_release_1_light
  • py2_release_last
  • py3_release_1
  • v3.1.1
  • v3.1
  • v4.0
  • v3.0
18 results

main_utils.py

Blame
  • main_utils.py 14.75 KiB
    # VL: Gather here utility functions for the main program.
    # Aim is to make main simpler and smaller and thus improve testability (by allowing separate/independant testing of small program "subparts").
    
    import sys
    from optparse import OptionParser, OptionGroup
    from _modules.utilities import checkPhageName,changeCase
    from _modules.IData_handling import totReads,genomeFastaRecovery
    
    ### FILE function
    def checkFastaFile(filin):
        """Check sequence Fasta file given by user"""
        first_line = 1
        infil = open(filin, 'rU')
        try:
            for line in infil:
                # Test '>'
                if first_line :
                    if line[0] != '>':
                        return 1
                    else:
                        first_line = 0
                        continue
                # Test 1st base per line : 'ATGCN>'
                base = changeCase(line[0])
                if base != 'A' and base != 'T' and base != 'C' and base != 'G' and base != 'N' and base != '\n' and base != '\r' and base != '>':
                    infil.close()
                    return 1
            infil.close()
            return 0
        except IOError:
            sys.exit('ERROR: No such file %s' % filin)
    
    def setOptions():
        # Option
        usage = """\n\nUsage: %prog -f reads.fastq -r phage_sequence.fasta [-n phage_name -p reads_paired -s seed_lenght -d surrounding -t installation_test -c nbr_core -g host.fasta (warning increase process time) -l limit_multi-fasta -v virome_time -u]
    
            Program: PhageTerm - Analyze phage termini and packaging mode using reads from high-throughput sequenced phage data
            Version: 2.0.0
            Contact: Julian Garneau <julian.garneau@usherbrooke.ca>
            Contact: David Bikard <david.bikard@pasteur.fr>
            Contact: Marc Monot <marc.monot@pasteur.fr>
    
            You can perform a program test run upon installation using the "-t " option.
            Arguments for the -t option can be : 5, 3, DS, DL, M , H or V
    
            Example of test commands :
            PhageTerm.py.py -t C5       -> Test run for a 5\' cohesive end (e.g. Lambda)
            PhageTerm.py.py -t C3       -> Test run for a 3\' cohesive end (e.g. HK97)
            PhageTerm.py.py -t DS     -> Test run for a Direct Terminal Repeats end short (e.g. T7)
            PhageTerm.py.py -t DL     -> Test run for a Direct Terminal Repeats end long (e.g. T5)
            PhageTerm.py.py -t H       -> Test run for a Headful packaging (e.g. P1)
            PhageTerm.py.py -t M       -> Test run for a Mu-like packaging (e.g. Mu)
            PhageTerm.py.py -t V       -> Test run for a Virome data
            """
    
        getopt = OptionParser(usage=usage)
    
        optreads = OptionGroup(getopt, 'Raw reads file in fastq format')
        optreads.add_option('-f', '--fastq', dest='fastq', metavar='FILE', help='Fastq reads from Illumina TruSeq')
        getopt.add_option_group(optreads)
    
        optref = OptionGroup(getopt, 'Phage genome in fasta format')
        optref.add_option('-r', '--ref', dest='reference', metavar='FILE',
                          help='Reference phage genome as contigs in fasta format')
        getopt.add_option_group(optref)
    
        optname = OptionGroup(getopt, 'Name of the phage being analyzed by the user')
        optname.add_option('-n', '--phagename', dest='phagename', metavar='STRING',
                           help='Manually enter the name of the phage being analyzed. Used as prefix for output files.')
        getopt.add_option_group(optname)
    
        optseed = OptionGroup(getopt, 'Lenght of the seed used for reads in the mapping process')
        optseed.add_option('-s', '--seed', dest='seed', metavar='INT', type="int",
                           help='Manually enter the lenght of the seed used for reads in the mapping process.')
        getopt.add_option_group(optseed)
    
        optsurround = OptionGroup(getopt, 'Lenght of the surrounding region considered for peak value cumulation')
        optsurround.add_option('-d', '--surrounding', dest='surround', type="int", metavar='INT',
                               help='Manually enter the lenght of the surrounding used to merge very close peaks in the analysis process.')
        getopt.add_option_group(optsurround)
    
        optcore = OptionGroup(getopt,
                              'GPU and multicore options. Default is 1 core and no GPU. These options are mutually exclusives')
        optcore.add_option('-c', '--core', dest='core', metavar='INT', type="int",
                           help='Manually enter the number of core you want to use.')
        optcore.add_option('-u', '--gpu', dest='gpu', action="store_true", default=False,
                           help='use this flag if you want to use GPU')
        getopt.add_option_group(optcore)
    
        opthost = OptionGroup(getopt, 'Host genome in fasta format')
        opthost.add_option('-g', '--host', dest='host', metavar='FILE',
                           help='Reference host genome as unique contig in fasta format')
        getopt.add_option_group(opthost)
    
        optpaired = OptionGroup(getopt, 'Use paired-end reads')
        optpaired.add_option('-p', '--paired', dest='paired', metavar='FILE',
                             help='Use paired-end reads to calculate real insert coverage')
        getopt.add_option_group(optpaired)
    
        optmean = OptionGroup(getopt, 'Defined phage mean coverage')
        optmean.add_option('-m', '--mean', dest='mean', metavar='INT', type="int", help='Defined phage mean coverage')
        getopt.add_option_group(optmean)
    
        optlimit = OptionGroup(getopt, 'Limit minimum fasta size (Default: 500)')
        optlimit.add_option('-l', '--limit', dest='limit', metavar='INT', type="int", help='Limit minimum fasta length')
        getopt.add_option_group(optlimit)
    
        optvirome = OptionGroup(getopt, 'Estimate execution time for a Virome')
        optvirome.add_option('-v', '--virome', dest='virome', metavar='INT', type="int",
                             help='Estimate execution time for a Virome')
        getopt.add_option_group(optvirome)
    
        opttest = OptionGroup(getopt, 'Perform a program test run upon installation')
        opttest.add_option('-t', '--test', dest='test', metavar='STRING',
                           help='Perform a program test run upon installation. If you want to perform a test run, use the "-t " option. Arguments for the -t option can be : C5, C3, DS, DL, H or M. C5 -> Test run for a 5\' cohesive end (e.g. Lambda); C3 -> Test run for a 3\' cohesive end (e.g. HK97); DS -> Test run for a short Direct Terminal Repeats end (e.g. T7); DL -> Test run for a long Direct Terminal Repeats end (e.g. T5); H -> Test run for a Headful packaging (e.g. P1); M -> Test run for a Mu-like packaging (e.g. Mu)')
        getopt.add_option_group(opttest)
    
        return getopt
    
    # input data as provided by the user
    class inputRawDataArgs:
        def __init__(self,fastq,reference,host,phagename,paired,test):
            if test == "C5":
                print               "\nPerforming a test run using test phage sequence with 5 prime cohesive overhang :"
                print               "\npython PhageTerm.py -f test-data/COS-5.fastq -r test-data/COS-5.fasta -n TEST_cohesive_5_prime"
                fastq = "test-data/COS-5.fastq"
                reference = "test-data/COS-5.fasta"
                phagename = "Test-cohesive-5'"
            elif test == "C3":
                print               "\nPerforming a test run using test phage sequence with 3 prime cohesive overhang:"
                print               "\npython PhageTerm.py -f test-data/COS-3.fastq -r test-data/COS-3.fasta -n TEST_cohesive_3_prime"
                fastq = "test-data/COS-3.fastq"
                reference = "test-data/COS-3.fasta"
                phagename = "Test-cohesive-3'"
            elif test == "DS":
                print               "\nPerforming a test run using test phage sequence with short direct terminal repeats (DTR-short) :"
                print               "\npython PhageTerm.py -f test-data/DTR-short.fastq -r test-data/DTR-short.fasta -n TEST_short_direct_terminal_repeats"
                fastq = "test-data/DTR-short.fastq"
                reference = "test-data/DTR-short.fasta"
                phagename = "Test-short-direct-terminal-repeats"
            elif test == "DL":
                print               "\nPerforming a test run using test phage sequence with long direct terminal repeats (DTR-long) :"
                print               "\npython PhageTerm.py -f test-data/DTR-long.fastq -r test-data/DTR-long.fasta -n TEST_long_direct_terminal_repeats"
                fastq = "test-data/DTR-long.fastq"
                reference = "test-data/DTR-long.fasta"
                phagename = "Test-long-direct-terminal-repeats"
            elif test == "H":
                print               "\nPerforming a test run using test phage sequence with headful packaging"
                print               "\npython PhageTerm.py -f test-data/Headful.fastq -r test-data/Headful.fasta -n TEST_headful"
                fastq = "test-data/Headful.fastq"
                reference = "test-data/Headful.fasta"
                phagename = "Test-Headful"
            elif test == "M":
                print               "\nPerforming a test run using test phage sequence with Mu-like packaging"
                print               "\npython PhageTerm.py -f test-data/Mu-like_R1.fastq -p test-data/Mu-like_R2.fastq -r test-data/Mu-like.fasta -n TEST_Mu-like -g test-data/Mu-like_host.fasta"
                fastq = "test-data/Mu-like_R1.fastq"
                paired = "test-data/Mu-like_R2.fastq"
                reference = "test-data/Mu-like.fasta"
                host = "test-data/Mu-like_host.fasta"
                phagename = "Test-Mu-like"
            elif test == "V":
                print               "\nPerforming a test run using virome data containing one example of each packaging mode"
                print               "\npython PhageTerm.py -f test-data/Virome.fastq -r test-data/Virome.fasta -n TEST_Virome -w 1"
                fastq = "test-data/Virome.fastq"
                reference = "test-data/Virome.fasta"
                phagename = "Test-Virome"
            if host == None:
                host = ""
            if paired == None:
                paired = ""
            # CHECK inputs
            if phagename!=None:
                phagename = checkPhageName(phagename)
                self.phagename = phagename
            if checkFastaFile(reference):
                exit("ERROR in reference file")
            self.reference = reference
            if host != "":
                if checkFastaFile(host):
                    exit("ERROR in reference file")
                self.host = host
            self.fastq=fastq
            self.paired=paired
            self.host=host
    
    
            # READS Number
            self.tot_reads = totReads(fastq)
            if paired != "":
                self.tot_reads_paired = totReads(paired)
                if (self.tot_reads != self.tot_reads_paired):
                    print "\nWARNING: Number of reads between the two reads files differ, using single reads only\n"
                    self.paired = ""
    
    
    
    # Here gather user input parameters and former global variable that define how the data will be processed from a functionnal point of view (ex: seed length...)
    class functionalParms:
        def __init__(self,seed,surround,mean,limit,virome,test):
            if seed == None:
                seed = 20
            if seed < 15:
                seed = 15
            if surround == None:
                surround = 20
            self.seed=seed
            self.surrounding=surround
    
            if limit == None:
                limit = 500
            self.limit_reference=limit
    
            if virome == None:
                virome = 0
            if virome != 1:
                virome = 0
            self.virome=virome
    
            if mean == None:
                mean = 250
            self.mean=mean
            if test == None:
                self.test_run = 0
            else:
                self.test_run = 1
            self.test=test
            if test=="H" or test=="M" or test=="V":
                self.surrounding = 0
            if test=="V":
                self.workflow = 1
            # VARIABLE
            self.edge = 500
            self.insert_max = 1000
            self.limit_fixed = 35
            self.limit_preferred = 11
            self.Mu_threshold = 0.5
            self.draw = 0
            self.workflow = 0
    
    # Here, gather data derived from the rawInputData and updated according to the functionnal parameters
    # functionnal parameter workflow can also be updated
    class InputDerivedDataArgs:
        def __init__(self,inputRaw,fparms):
            # REFERENCE sequence recovery and edge adds
            self.refseq_liste, self.refseq_name, refseq_rejected = genomeFastaRecovery(inputRaw.reference, fparms.limit_reference, fparms.edge)
            self.nbr_virome = len(self.refseq_liste)
            if self.nbr_virome == 0:
                "\nERROR: All the reference(s) sequence(s) are under the length limitation : " + str(
                    fparms.limit_reference) + " (adapt your -l option)"
                exit()
            if self.nbr_virome > 1:
                fparms.workflow = 1
            length_virome = len("".join(self.refseq_liste))
            self.mean_virome = length_virome / self.nbr_virome
            if fparms.virome:
                self.refseq_liste, self.refseq_name, refseq_rejected = ["N" * self.mean_virome], ["Test_virome"], 0
            # HOST sequence recovery
            if len(self.refseq_liste) == 1 and inputRaw.host != "":
                self.hostseq = genomeFastaRecovery(inputRaw.host, fparms.limit_reference, fparms.edge, 1)
                if len(self.hostseq[0]) != 0 and len(self.hostseq[0]) < len(self.refseq_liste[0]):
                    print "\nHost length < Phage length : removing host sequence."
                    self.hostseq = ""
            else:
                self.hostseq = ""
                if len(self.refseq_liste) > 1:
                    print "\nWARNING: Host analysis impossible with multiple fasta input\n"
    
    
    # Here gather user input parameters and former global variable that define how the data will be processed from a technical point of view (ex: multicore,gpu...)
    class technicalParms:
        def __init__(self,core,gpu,mean):
            if gpu != 0 and core != 1:
                print "Choose either multicore or gpu!"
                exit(1)
            self.core=core
            if core == None:
                self.core = 1
            self.limit_coverage = max(50, mean * 2) / self.core
            self.gpu=gpu
            if gpu == None:
                self.gpu = 0
    
    
    # Checks options consistency and instantiates data structure for main.
    def checkOptArgsConsistency(getopt):
        options, arguments = getopt.parse_args()
        if options.fastq == None and options.test == None:
            getopt.error('\tNo reads file provided.\n\t\t\tUse -h or --help for more details\n')
    
        if options.reference == None and options.test == None:
            getopt.error('\tNo fasta reference file provided.\n\t\t\tUse -h or --help for more details\n')
    
        if options.phagename == None and options.test == None:
            phagename = "Phagename"
    
        inRawDArgs=inputRawDataArgs(options.fastq,options.reference,options.host,options.phagename,options.paired,options.test)
        fParms=functionalParms(options.seed,options.surround,options.mean,options.limit,options.virome,options.test)
        tParms=technicalParms(options.core,options.gpu,fParms.mean)
        inDArgs=InputDerivedDataArgs(inRawDArgs,fParms)
        return inRawDArgs,fParms,tParms,inDArgs # TODO: make a version that returns only 1 structure gathering only the useful information.