Skip to content
Snippets Groups Projects
Select Git revision
  • 2ff25cc31366d1fc956bce051d82fe7b35e3695c
  • master default protected
  • vero_dev
  • ptv_packaging
  • process_gzip_input
  • py3_release_1
  • paired_read_fix
  • first_read_bugfix
  • v4.3
  • v4.2
  • v4.1
  • py3_release_1_light
  • py2_release_last
  • py3_release_1
  • v3.1.1
  • v3.1
  • v4.0
  • v3.0
18 results

GPU_mapper.py

Blame
  • GPU_mapper.py 17.92 KiB
    ##@file GPU_mapper.py
    #
    # This file contains classes for performing the mapping of read extracts against sequences using the openCL technology.
    ##@author vlegrand@pasteur.fr
    from __future__ import division # Preparing migration to python3
    import random
    import numpy as np
    import time
    
    
    try:
    	from OCL_kernels import *
    except ImportError:
    	print "WARNING No openCL implementation on that machine"
    
    
    ## more convenient class to encapsulate mapping results for single reads as weel as for PE.
    #
    # It is intended to be used in readsCoverageGPU.
    # l_matches contains read extracts maping results agains a sequence.
    # position 0 in l_matches corresponds to read[:seed]
    # position 1 in l_matches corresponds to read[-seed:]
    # if reads are paired, there are only 2 elements in l_matches. Otherwise, there are 4.
    # position 2 in l_matches corresponds to RevCpl(read[-seed:])
    # position 3 in l_matches corresponds to RevCpl(read[:seed])
    class gpuBasicMappingResultsForCov:
        def __init__(self,l_matches,single=True):
            self.is_single=single
            self.read_start_match=l_matches[0]
            self.read_end_match=l_matches[1]
            if single:
                self.rev_cpl_start_match=l_matches[2]
                self.rev_cpl_end_match=l_matches[3]
            else:
                self.PE2_start_match=l_matches[2]
                self.rev_cpl_PE2_start_match=l_matches[3]
                self.rev_cpl_PE2_stop_match=l_matches[4]
    
    def getGPUMappingResults(reduced_fic,occ_pos_fic,max_nb_occ_fic,nb_reads,nb_extracts,ref_data,single=True):
        reduced=np.loadtxt(reduced_fic, dtype=np.int64, delimiter='|')
        occ_pos=np.loadtxt(occ_pos_fic, dtype=np.int32, delimiter='|')
        f_nb_occ = open(max_nb_occ_fic)
        str_nb_occ = f_nb_occ.read()
        f_nb_occ.close()
        max_nb_occ = int(str_nb_occ)
        mres=gpuBasicMappingResults(reduced, occ_pos, max_nb_occ, nb_reads,
                               nb_extracts, ref_data,single)
        return mres
    
    
    ## Class to encapsulate the results of the mapping of all read extracts against all sequences.
    class gpuBasicMappingResults:
        ## Constructor
        #
        # @param reduced_nb_occ_foreach_extract_in_seqs An array containing for each extract that appears in a sequence, the number of occurrecnces and where  to read occurence postion.
        # @param extracts_occ_positions An array containing all positions where a read extract appears in a sequence.
        # @param max_nb_occ Used to compute the size (in number of columns) of reduced_nb_occ_foreach_extract_in_seqs
        # @ref_data The sequences (contigs) in which we looked for the read extract.
        # @single True if we are processing single reads.
        def __init__(self,reduced_nb_occ_foreach_extract_in_seqs,extracts_occ_positions,max_nb_occ,nb_reads,nb_extracts,ref_data,single=True):
            self.reduced_nb_occ_foreach_extract_in_seqs = reduced_nb_occ_foreach_extract_in_seqs
            self.max_tot_nb_occ_per_seq = int(max_nb_occ)
            self.extracts_occ_positions = extracts_occ_positions
            self.nb_reads = nb_reads
            self.nb_extracts =nb_extracts
            self.nb_extracts_per_read = self.nb_extracts//self.nb_reads
            self.ref_data=ref_data
            self.is_single=single
            self.l_siz=3 * self.max_tot_nb_occ_per_seq # size of a line in reduced array
    
    
        ## Gets all position where a kmer occurs in the given sequence.
        #
        # gets all positions where read extract occurrs in a given sequence
        def getOccPosForRExtract(self,idx_seq,start_idx_for_pos,extract_nb_occ):
            l_occ_pos = []
            for j in range(idx_seq * self.max_tot_nb_occ_per_seq + start_idx_for_pos,
                           idx_seq * self.max_tot_nb_occ_per_seq + start_idx_for_pos + extract_nb_occ):
                l_occ_pos.append(self.extracts_occ_positions[j])
            return l_occ_pos
    
    
        ## Gets mapping results for a given read against the host sequence.
        #
        # For each extract from this read, if it was found in the host sequence, get all positions were it was found and pick one of them randomly.
        # TODO : If this method is called too many times, make once for all a dictionnary with results for all reads that were found in it.
        def getResultsForReadAgainstHostseq(self,idx_cur_read):
            l_extracts_match = [(-1, -1)] * self.nb_extracts_per_read
            if self.ref_data.hostseq != "":
                for j in range((self.ref_data.nb_sequences-1)*self.l_siz,self.ref_data.nb_sequences*self.l_siz):
                    r = j % self.l_siz  # column index
                    if r % 3 == 0:  # index of read_extract
                        idx_extract = self.reduced_nb_occ_foreach_extract_in_seqs[j]
                        if idx_extract>idx_cur_read*self.nb_extracts_per_read+self.nb_extracts_per_read-1:
                            break
                        elif idx_extract>=idx_cur_read*self.nb_extracts_per_read:
                            extract_nb_occ = self.reduced_nb_occ_foreach_extract_in_seqs[j + 1]
                            start_idx_for_pos = self.reduced_nb_occ_foreach_extract_in_seqs[j + 2]
                            l_occ_pos=self.getOccPosForRExtract(self.ref_data.nb_sequences-1,start_idx_for_pos,extract_nb_occ)
                            match = random.choice(l_occ_pos)
                            l_extracts_match[idx_extract%idx_cur_read]=(match,match+self.ref_data.seed)
            return l_extracts_match
    
        ## For a given sequence (contig), yields results read by read.
        def mappingResultsForSeq(self,refseq):
            idx_seq=self.ref_data.getIdxSeq(refseq)
            if not self.ref_data.IdxIsHostseq(idx_seq):
                idx_cur_read = 0
                l_extracts_match = [(-1, -1)] * self.nb_extracts_per_read
                for i in range(idx_seq * self.l_siz, self.l_siz * idx_seq + self.l_siz):
                    idx_col = i % self.l_siz  # column index
                    if idx_col % 3 == 0:  # index of read_extract
                        idx_extract = self.reduced_nb_occ_foreach_extract_in_seqs[i]
                        #print "idx_extract=",idx_extract
                        if (idx_extract == -1): break  # no more read extracts were found in this sequence.
                        extract_nb_occ = self.reduced_nb_occ_foreach_extract_in_seqs[i + 1]
                        start_idx_for_pos = self.reduced_nb_occ_foreach_extract_in_seqs[i + 2]
                        # determine which read the extract belongs to and which extract it is
                        idx_read = idx_extract / self.nb_extracts_per_read
                        # Must yield information for all reads processed; even those for which no extracts were found.
                        while idx_read>=idx_cur_read+1:
                            r = gpuBasicMappingResultsForCov(l_extracts_match,self.is_single) # processing a new read, must yield results for the previous one.
                            r_hostseq = gpuBasicMappingResultsForCov(self.getResultsForReadAgainstHostseq(idx_cur_read),self.is_single)
                            yield r, r_hostseq, idx_cur_read
                            idx_cur_read += 1
                            l_extracts_match = [(-1, -1)] * self.nb_extracts_per_read
                        idx_extract_in_read = idx_extract % self.nb_extracts_per_read
                        l_occ_pos = []
                        for j in range(idx_seq * self.max_tot_nb_occ_per_seq + int(start_idx_for_pos),
                                       idx_seq * self.max_tot_nb_occ_per_seq + int(start_idx_for_pos) + int(extract_nb_occ)):
                            l_occ_pos.append(self.extracts_occ_positions[j])
                        if len(l_occ_pos) > 0:
                            match = random.choice(l_occ_pos)
                            l_extracts_match[idx_extract_in_read]=(match, match+self.ref_data.seed)
                # yield empty results for all remaining reads (mapping for them was not found in the sequence. Otherwise, they would be in the array.
                while (idx_cur_read<=self.nb_reads-1):
                    # All read extracts may not have been processed.
                    # This is the case if after the last read extract that was found in the sequence, nothing was found for other read extracts.
                    # Yield empty results in that case.
                    r = gpuBasicMappingResultsForCov(l_extracts_match,self.is_single)  # yields results for the last read processed
                    r_hostseq = gpuBasicMappingResultsForCov(self.getResultsForReadAgainstHostseq(idx_cur_read),self.is_single)
                    yield r, r_hostseq, idx_cur_read  # yield last read.
                    idx_cur_read+=1
                    l_extracts_match = [(-1, -1)] * self.nb_extracts_per_read
    
    ## Performs the mapping of all reads against all sequence using a GPU.
    #
    # This class encapsulates all the mapping process on GPU.
    class gpuBasicMapper:
        ## Constructor
        #
        # @param read_extracts A readsExtractsS or readExtractsPE depending on if we are rocessing single reads or paired reads.
        # @param ref_data A refData object containing the sequences (contigs).
        def __init__(self,read_extracts,ref_data):
            self.read_extracts=read_extracts
            self.seq_data=ref_data
            #self.starting_from=0
    
        ## Prepare input parameters on device side, only available for parameters that are used by several kernels.
        def prepInputOnDevice(self,input_rextracts,nb_sequences,nb_extracts):
            #print "extract id 26767",input_rextracts[26767]
            self.device_rextracts = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
                                         hostbuf=input_rextracts)
            self.k = np.array(self.read_extracts.seed,
                         np.uint32)  # need to pass an array of 1 element otherwise get an exception while trying to create buffer.
            self.device_k = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR, hostbuf=self.k)
            self.device_nb_sequences = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
                                            hostbuf=np.array(nb_sequences, np.uint32))
            self.device_nb_extracts = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
                                           hostbuf=np.array(nb_extracts, np.int64))
    
    
    
        def countKmerNbOcc(self,nb_extracts,input_seqs,nb_sequences,seq_sizes_l):
            o_nb_occ_foreach_extract_in_seqs = np.full(2 * nb_extracts * nb_sequences, 0,
                                                       np.int64)  # result array for kernel 1.
            # prepare input and output parameters on device size.
            self.device_sequences = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR, hostbuf=input_seqs)
            self.device_seq_sizes = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
                                         hostbuf=np.array(seq_sizes_l, np.uint32))
            self.device_nb_occ_foreach_extract_in_seqs = cl.Buffer(self.ctx, self.mf.WRITE_ONLY | self.mf.USE_HOST_PTR,
                                                              hostbuf=o_nb_occ_foreach_extract_in_seqs)
            prg = cl.Program(self.ctx, reads_nb_occ_in_sequences_str).build()
            nb_threads_dim1 = getNbThreads(nb_extracts, self.max_dim1)
            prg.reads_nb_occ(self.queue, (nb_threads_dim1,), (1,), self.device_k, self.device_rextracts, self.device_nb_extracts,
                             self.device_sequences,
                             self.device_seq_sizes, self.device_nb_sequences, self.device_nb_occ_foreach_extract_in_seqs)
            # get kernel execution results back to host (number of occurrences of each k_mer in each sequence.
            cl.enqueue_copy(self.queue, o_nb_occ_foreach_extract_in_seqs, self.device_nb_occ_foreach_extract_in_seqs).wait()
            return o_nb_occ_foreach_extract_in_seqs
    
    
        def getFinalResultInfo(self,o_nb_occ_foreach_extract_in_seqs,nb_sequences):
            self.device_nb_occ_foreach_extract_in_seqs = cl.Buffer(self.ctx, self.mf.WRITE_ONLY | self.mf.USE_HOST_PTR,
                                                              hostbuf=o_nb_occ_foreach_extract_in_seqs)
            prg_update = cl.Program(self.ctx, upd_reads_nb_occ_in_sequences_str).build()
            nb_threads_dim1 = getNbThreads(nb_sequences, self.max_dim1)
            prg_update.upd_nb_occ_array(self.queue, (nb_threads_dim1,), (1,), self.device_nb_occ_foreach_extract_in_seqs,
                                        self.device_nb_sequences,
                                        self.device_nb_extracts)
            cl.enqueue_copy(self.queue, o_nb_occ_foreach_extract_in_seqs, self.device_nb_occ_foreach_extract_in_seqs)
    
        def getOccPositions(self,max_nb_occ,nb_sequences,nb_extracts):
            host_max_nb_occ = np.array(max_nb_occ,
                                       np.int64)  # max over all sequences of the total number of read extracts occurrences per sequence
            self.device_max_nb_occ = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.USE_HOST_PTR, hostbuf=host_max_nb_occ)
            host_read_occ_positions = np.full(int(host_max_nb_occ * nb_sequences), 0,
                                              dtype=np.uint32)  # contains the positions of all extracts occurrences in the sequences.
            device_read_occ_positions = cl.Buffer(self.ctx, self.mf.WRITE_ONLY | self.mf.USE_HOST_PTR, hostbuf=host_read_occ_positions)
    
            prg_get_occ_pos = cl.Program(self.ctx, reads_occ_pos_in_seqs_str).build()
            nb_threads_dim1 = getNbThreads(nb_extracts, self.max_dim1)
            prg_get_occ_pos.get_occ_pos(self.queue, (nb_threads_dim1,), (1,), self.device_nb_occ_foreach_extract_in_seqs,
                                        self.device_nb_sequences,
                                        self.device_nb_extracts, self.device_max_nb_occ, device_read_occ_positions, self.device_k,
                                        self.device_rextracts, self.device_sequences, self.device_seq_sizes)
            cl.enqueue_copy(self.queue, host_read_occ_positions, device_read_occ_positions)
            return host_read_occ_positions
    
        def getReducedKmerNbOcc(self,max_nb_occ,nb_sequences):
            reduced_nb_occ_foreach_extract_in_seqs = np.full(int(3 * max_nb_occ * nb_sequences), -1, dtype=np.int64)
            device_reduced_nb_occ_foreach_extract_in_seqs = cl.Buffer(self.ctx, self.mf.WRITE_ONLY | self.mf.USE_HOST_PTR,
                                                                      hostbuf=reduced_nb_occ_foreach_extract_in_seqs)
            #sf = np.array(self.starting_from,np.uint64)
            #device_sf = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
            #                          hostbuf=sf)
    
            prg_red_rd_occ = cl.Program(self.ctx, reads_nb_occ_reduction_str).build()
            nb_threads_dim1 = getNbThreads(nb_sequences, self.max_dim1)
            prg_red_rd_occ.reads_nb_occ_reduction(self.queue, (nb_threads_dim1,), (1,), self.device_nb_occ_foreach_extract_in_seqs,
                                                  self.device_nb_sequences,
                                                  self.device_nb_extracts, device_reduced_nb_occ_foreach_extract_in_seqs,
                                                  self.device_max_nb_occ)
            cl.enqueue_copy(self.queue, reduced_nb_occ_foreach_extract_in_seqs, device_reduced_nb_occ_foreach_extract_in_seqs)
            return reduced_nb_occ_foreach_extract_in_seqs
    
    
        ## maps PE1 and PE2 or single reads using GPU.
        #
        # This is used for datasets up to (approx) 1 million of reads single and 200 sequences on the P100 GPU card that has 16GB of memory.
        # But this will have to be adapted to your GPU and dataset.
        def doMapping(self):
            self.ctx, self.queue, self.mf, self.max_dim1 = getOpenCLElements()
            t1=time.time()
            nb_sequences=self.seq_data.nb_sequences # for readability.
            # prepare input and output parameters on host side.
            seq_sizes_l = self.seq_data.getSeqSizesList() # list of the length of all sequences
            str_seqs = "".join(self.seq_data.refseq_list) # all contigs concatenated in a string
            input_seqs = np.array(str_seqs) # Need to put all the sequences in a numpy array for openCL.
            extracts_l,nb_reads,nb_extracts=self.read_extracts.getRExtracts()
            input_rextracts = np.array(extracts_l) # all read extracts
            self.prepInputOnDevice(input_rextracts, nb_sequences, nb_extracts)
            o_nb_occ_foreach_extract_in_seqs=self.countKmerNbOcc(nb_extracts, input_seqs, nb_sequences, seq_sizes_l)
            t2=time.time()
            print "took ",t2-t1," seconds for kernel1"
            #o_nb_occ_foreach_extract_in_seqs.tofile("res_kernel1",sep="|",format="%s")
    
            # determine size in memory of final results and
            # compute position of per-read final results in final results array
            # (ie: tells from which box in final array we must read the positions of the N occurrences of read X in genome A.)
            # This kernel is intended to be used with 1 thread per sequence.
            # It computes for each sequence, the total number of occurrences of kmers.
            self.getFinalResultInfo(o_nb_occ_foreach_extract_in_seqs,nb_sequences)
    
            #o_nb_occ_foreach_extract_in_seqs.tofile("res_kernel2", sep="|", format="%s")
            max_nb_occ=getMaxNbOccPerSeq(o_nb_occ_foreach_extract_in_seqs,nb_extracts,nb_sequences)
            t3=time.time()
            print "took ", t3 - t2, " seconds for kernel2"
            host_read_occ_positions=self.getOccPositions(max_nb_occ,nb_sequences,nb_extracts)
            t4=time.time()
            print "took ", t4 - t3, " seconds for kernel3"
            #host_read_occ_positions.tofile("res_kernel3", sep="|", format="%s")
    
            # Gets reduced version of output_reads_nb_occ_in_seqs that enables me to iterate only on useful elements
            # of host_read_occ_positions which is much faster than iterating over a big array.
            reduced_nb_occ_foreach_extract_in_seqs=self.getReducedKmerNbOcc(max_nb_occ, nb_sequences)
    
            #reduced_nb_occ_foreach_extract_in_seqs.tofile("res_kernel4", sep="|", format="%s")
            t5=time.time()
            print "took ", t5 - t4, " seconds for kernel4"
            self.mapping_res=gpuBasicMappingResults(reduced_nb_occ_foreach_extract_in_seqs,host_read_occ_positions,int(max_nb_occ),nb_reads,nb_extracts,self.seq_data)
            t6=time.time()
            print "took ", t6 - t5, " seconds for getting mapping res"
            return self.mapping_res