Select Git revision
GPU_mapper.py
GPU_mapper.py 17.92 KiB
##@file GPU_mapper.py
#
# This file contains classes for performing the mapping of read extracts against sequences using the openCL technology.
##@author vlegrand@pasteur.fr
from __future__ import division # Preparing migration to python3
import random
import numpy as np
import time
try:
from OCL_kernels import *
except ImportError:
print "WARNING No openCL implementation on that machine"
## more convenient class to encapsulate mapping results for single reads as weel as for PE.
#
# It is intended to be used in readsCoverageGPU.
# l_matches contains read extracts maping results agains a sequence.
# position 0 in l_matches corresponds to read[:seed]
# position 1 in l_matches corresponds to read[-seed:]
# if reads are paired, there are only 2 elements in l_matches. Otherwise, there are 4.
# position 2 in l_matches corresponds to RevCpl(read[-seed:])
# position 3 in l_matches corresponds to RevCpl(read[:seed])
class gpuBasicMappingResultsForCov:
def __init__(self,l_matches,single=True):
self.is_single=single
self.read_start_match=l_matches[0]
self.read_end_match=l_matches[1]
if single:
self.rev_cpl_start_match=l_matches[2]
self.rev_cpl_end_match=l_matches[3]
else:
self.PE2_start_match=l_matches[2]
self.rev_cpl_PE2_start_match=l_matches[3]
self.rev_cpl_PE2_stop_match=l_matches[4]
def getGPUMappingResults(reduced_fic,occ_pos_fic,max_nb_occ_fic,nb_reads,nb_extracts,ref_data,single=True):
reduced=np.loadtxt(reduced_fic, dtype=np.int64, delimiter='|')
occ_pos=np.loadtxt(occ_pos_fic, dtype=np.int32, delimiter='|')
f_nb_occ = open(max_nb_occ_fic)
str_nb_occ = f_nb_occ.read()
f_nb_occ.close()
max_nb_occ = int(str_nb_occ)
mres=gpuBasicMappingResults(reduced, occ_pos, max_nb_occ, nb_reads,
nb_extracts, ref_data,single)
return mres
## Class to encapsulate the results of the mapping of all read extracts against all sequences.
class gpuBasicMappingResults:
## Constructor
#
# @param reduced_nb_occ_foreach_extract_in_seqs An array containing for each extract that appears in a sequence, the number of occurrecnces and where to read occurence postion.
# @param extracts_occ_positions An array containing all positions where a read extract appears in a sequence.
# @param max_nb_occ Used to compute the size (in number of columns) of reduced_nb_occ_foreach_extract_in_seqs
# @ref_data The sequences (contigs) in which we looked for the read extract.
# @single True if we are processing single reads.
def __init__(self,reduced_nb_occ_foreach_extract_in_seqs,extracts_occ_positions,max_nb_occ,nb_reads,nb_extracts,ref_data,single=True):
self.reduced_nb_occ_foreach_extract_in_seqs = reduced_nb_occ_foreach_extract_in_seqs
self.max_tot_nb_occ_per_seq = int(max_nb_occ)
self.extracts_occ_positions = extracts_occ_positions
self.nb_reads = nb_reads
self.nb_extracts =nb_extracts
self.nb_extracts_per_read = self.nb_extracts//self.nb_reads
self.ref_data=ref_data
self.is_single=single
self.l_siz=3 * self.max_tot_nb_occ_per_seq # size of a line in reduced array
## Gets all position where a kmer occurs in the given sequence.
#
# gets all positions where read extract occurrs in a given sequence
def getOccPosForRExtract(self,idx_seq,start_idx_for_pos,extract_nb_occ):
l_occ_pos = []
for j in range(idx_seq * self.max_tot_nb_occ_per_seq + start_idx_for_pos,
idx_seq * self.max_tot_nb_occ_per_seq + start_idx_for_pos + extract_nb_occ):
l_occ_pos.append(self.extracts_occ_positions[j])
return l_occ_pos
## Gets mapping results for a given read against the host sequence.
#
# For each extract from this read, if it was found in the host sequence, get all positions were it was found and pick one of them randomly.
# TODO : If this method is called too many times, make once for all a dictionnary with results for all reads that were found in it.
def getResultsForReadAgainstHostseq(self,idx_cur_read):
l_extracts_match = [(-1, -1)] * self.nb_extracts_per_read
if self.ref_data.hostseq != "":
for j in range((self.ref_data.nb_sequences-1)*self.l_siz,self.ref_data.nb_sequences*self.l_siz):
r = j % self.l_siz # column index
if r % 3 == 0: # index of read_extract
idx_extract = self.reduced_nb_occ_foreach_extract_in_seqs[j]
if idx_extract>idx_cur_read*self.nb_extracts_per_read+self.nb_extracts_per_read-1:
break
elif idx_extract>=idx_cur_read*self.nb_extracts_per_read:
extract_nb_occ = self.reduced_nb_occ_foreach_extract_in_seqs[j + 1]
start_idx_for_pos = self.reduced_nb_occ_foreach_extract_in_seqs[j + 2]
l_occ_pos=self.getOccPosForRExtract(self.ref_data.nb_sequences-1,start_idx_for_pos,extract_nb_occ)
match = random.choice(l_occ_pos)
l_extracts_match[idx_extract%idx_cur_read]=(match,match+self.ref_data.seed)
return l_extracts_match
## For a given sequence (contig), yields results read by read.
def mappingResultsForSeq(self,refseq):
idx_seq=self.ref_data.getIdxSeq(refseq)
if not self.ref_data.IdxIsHostseq(idx_seq):
idx_cur_read = 0
l_extracts_match = [(-1, -1)] * self.nb_extracts_per_read
for i in range(idx_seq * self.l_siz, self.l_siz * idx_seq + self.l_siz):
idx_col = i % self.l_siz # column index
if idx_col % 3 == 0: # index of read_extract
idx_extract = self.reduced_nb_occ_foreach_extract_in_seqs[i]
#print "idx_extract=",idx_extract
if (idx_extract == -1): break # no more read extracts were found in this sequence.
extract_nb_occ = self.reduced_nb_occ_foreach_extract_in_seqs[i + 1]
start_idx_for_pos = self.reduced_nb_occ_foreach_extract_in_seqs[i + 2]
# determine which read the extract belongs to and which extract it is
idx_read = idx_extract / self.nb_extracts_per_read
# Must yield information for all reads processed; even those for which no extracts were found.
while idx_read>=idx_cur_read+1:
r = gpuBasicMappingResultsForCov(l_extracts_match,self.is_single) # processing a new read, must yield results for the previous one.
r_hostseq = gpuBasicMappingResultsForCov(self.getResultsForReadAgainstHostseq(idx_cur_read),self.is_single)
yield r, r_hostseq, idx_cur_read
idx_cur_read += 1
l_extracts_match = [(-1, -1)] * self.nb_extracts_per_read
idx_extract_in_read = idx_extract % self.nb_extracts_per_read
l_occ_pos = []
for j in range(idx_seq * self.max_tot_nb_occ_per_seq + int(start_idx_for_pos),
idx_seq * self.max_tot_nb_occ_per_seq + int(start_idx_for_pos) + int(extract_nb_occ)):
l_occ_pos.append(self.extracts_occ_positions[j])
if len(l_occ_pos) > 0:
match = random.choice(l_occ_pos)
l_extracts_match[idx_extract_in_read]=(match, match+self.ref_data.seed)
# yield empty results for all remaining reads (mapping for them was not found in the sequence. Otherwise, they would be in the array.
while (idx_cur_read<=self.nb_reads-1):
# All read extracts may not have been processed.
# This is the case if after the last read extract that was found in the sequence, nothing was found for other read extracts.
# Yield empty results in that case.
r = gpuBasicMappingResultsForCov(l_extracts_match,self.is_single) # yields results for the last read processed
r_hostseq = gpuBasicMappingResultsForCov(self.getResultsForReadAgainstHostseq(idx_cur_read),self.is_single)
yield r, r_hostseq, idx_cur_read # yield last read.
idx_cur_read+=1
l_extracts_match = [(-1, -1)] * self.nb_extracts_per_read
## Performs the mapping of all reads against all sequence using a GPU.
#
# This class encapsulates all the mapping process on GPU.
class gpuBasicMapper:
## Constructor
#
# @param read_extracts A readsExtractsS or readExtractsPE depending on if we are rocessing single reads or paired reads.
# @param ref_data A refData object containing the sequences (contigs).
def __init__(self,read_extracts,ref_data):
self.read_extracts=read_extracts
self.seq_data=ref_data
#self.starting_from=0
## Prepare input parameters on device side, only available for parameters that are used by several kernels.
def prepInputOnDevice(self,input_rextracts,nb_sequences,nb_extracts):
#print "extract id 26767",input_rextracts[26767]
self.device_rextracts = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
hostbuf=input_rextracts)
self.k = np.array(self.read_extracts.seed,
np.uint32) # need to pass an array of 1 element otherwise get an exception while trying to create buffer.
self.device_k = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR, hostbuf=self.k)
self.device_nb_sequences = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
hostbuf=np.array(nb_sequences, np.uint32))
self.device_nb_extracts = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
hostbuf=np.array(nb_extracts, np.int64))
def countKmerNbOcc(self,nb_extracts,input_seqs,nb_sequences,seq_sizes_l):
o_nb_occ_foreach_extract_in_seqs = np.full(2 * nb_extracts * nb_sequences, 0,
np.int64) # result array for kernel 1.
# prepare input and output parameters on device size.
self.device_sequences = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR, hostbuf=input_seqs)
self.device_seq_sizes = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
hostbuf=np.array(seq_sizes_l, np.uint32))
self.device_nb_occ_foreach_extract_in_seqs = cl.Buffer(self.ctx, self.mf.WRITE_ONLY | self.mf.USE_HOST_PTR,
hostbuf=o_nb_occ_foreach_extract_in_seqs)
prg = cl.Program(self.ctx, reads_nb_occ_in_sequences_str).build()
nb_threads_dim1 = getNbThreads(nb_extracts, self.max_dim1)
prg.reads_nb_occ(self.queue, (nb_threads_dim1,), (1,), self.device_k, self.device_rextracts, self.device_nb_extracts,
self.device_sequences,
self.device_seq_sizes, self.device_nb_sequences, self.device_nb_occ_foreach_extract_in_seqs)
# get kernel execution results back to host (number of occurrences of each k_mer in each sequence.
cl.enqueue_copy(self.queue, o_nb_occ_foreach_extract_in_seqs, self.device_nb_occ_foreach_extract_in_seqs).wait()
return o_nb_occ_foreach_extract_in_seqs
def getFinalResultInfo(self,o_nb_occ_foreach_extract_in_seqs,nb_sequences):
self.device_nb_occ_foreach_extract_in_seqs = cl.Buffer(self.ctx, self.mf.WRITE_ONLY | self.mf.USE_HOST_PTR,
hostbuf=o_nb_occ_foreach_extract_in_seqs)
prg_update = cl.Program(self.ctx, upd_reads_nb_occ_in_sequences_str).build()
nb_threads_dim1 = getNbThreads(nb_sequences, self.max_dim1)
prg_update.upd_nb_occ_array(self.queue, (nb_threads_dim1,), (1,), self.device_nb_occ_foreach_extract_in_seqs,
self.device_nb_sequences,
self.device_nb_extracts)
cl.enqueue_copy(self.queue, o_nb_occ_foreach_extract_in_seqs, self.device_nb_occ_foreach_extract_in_seqs)
def getOccPositions(self,max_nb_occ,nb_sequences,nb_extracts):
host_max_nb_occ = np.array(max_nb_occ,
np.int64) # max over all sequences of the total number of read extracts occurrences per sequence
self.device_max_nb_occ = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.USE_HOST_PTR, hostbuf=host_max_nb_occ)
host_read_occ_positions = np.full(int(host_max_nb_occ * nb_sequences), 0,
dtype=np.uint32) # contains the positions of all extracts occurrences in the sequences.
device_read_occ_positions = cl.Buffer(self.ctx, self.mf.WRITE_ONLY | self.mf.USE_HOST_PTR, hostbuf=host_read_occ_positions)
prg_get_occ_pos = cl.Program(self.ctx, reads_occ_pos_in_seqs_str).build()
nb_threads_dim1 = getNbThreads(nb_extracts, self.max_dim1)
prg_get_occ_pos.get_occ_pos(self.queue, (nb_threads_dim1,), (1,), self.device_nb_occ_foreach_extract_in_seqs,
self.device_nb_sequences,
self.device_nb_extracts, self.device_max_nb_occ, device_read_occ_positions, self.device_k,
self.device_rextracts, self.device_sequences, self.device_seq_sizes)
cl.enqueue_copy(self.queue, host_read_occ_positions, device_read_occ_positions)
return host_read_occ_positions
def getReducedKmerNbOcc(self,max_nb_occ,nb_sequences):
reduced_nb_occ_foreach_extract_in_seqs = np.full(int(3 * max_nb_occ * nb_sequences), -1, dtype=np.int64)
device_reduced_nb_occ_foreach_extract_in_seqs = cl.Buffer(self.ctx, self.mf.WRITE_ONLY | self.mf.USE_HOST_PTR,
hostbuf=reduced_nb_occ_foreach_extract_in_seqs)
#sf = np.array(self.starting_from,np.uint64)
#device_sf = cl.Buffer(self.ctx, self.mf.READ_ONLY | self.mf.HOST_READ_ONLY | self.mf.USE_HOST_PTR,
# hostbuf=sf)
prg_red_rd_occ = cl.Program(self.ctx, reads_nb_occ_reduction_str).build()
nb_threads_dim1 = getNbThreads(nb_sequences, self.max_dim1)
prg_red_rd_occ.reads_nb_occ_reduction(self.queue, (nb_threads_dim1,), (1,), self.device_nb_occ_foreach_extract_in_seqs,
self.device_nb_sequences,
self.device_nb_extracts, device_reduced_nb_occ_foreach_extract_in_seqs,
self.device_max_nb_occ)
cl.enqueue_copy(self.queue, reduced_nb_occ_foreach_extract_in_seqs, device_reduced_nb_occ_foreach_extract_in_seqs)
return reduced_nb_occ_foreach_extract_in_seqs
## maps PE1 and PE2 or single reads using GPU.
#
# This is used for datasets up to (approx) 1 million of reads single and 200 sequences on the P100 GPU card that has 16GB of memory.
# But this will have to be adapted to your GPU and dataset.
def doMapping(self):
self.ctx, self.queue, self.mf, self.max_dim1 = getOpenCLElements()
t1=time.time()
nb_sequences=self.seq_data.nb_sequences # for readability.
# prepare input and output parameters on host side.
seq_sizes_l = self.seq_data.getSeqSizesList() # list of the length of all sequences
str_seqs = "".join(self.seq_data.refseq_list) # all contigs concatenated in a string
input_seqs = np.array(str_seqs) # Need to put all the sequences in a numpy array for openCL.
extracts_l,nb_reads,nb_extracts=self.read_extracts.getRExtracts()
input_rextracts = np.array(extracts_l) # all read extracts
self.prepInputOnDevice(input_rextracts, nb_sequences, nb_extracts)
o_nb_occ_foreach_extract_in_seqs=self.countKmerNbOcc(nb_extracts, input_seqs, nb_sequences, seq_sizes_l)
t2=time.time()
print "took ",t2-t1," seconds for kernel1"
#o_nb_occ_foreach_extract_in_seqs.tofile("res_kernel1",sep="|",format="%s")
# determine size in memory of final results and
# compute position of per-read final results in final results array
# (ie: tells from which box in final array we must read the positions of the N occurrences of read X in genome A.)
# This kernel is intended to be used with 1 thread per sequence.
# It computes for each sequence, the total number of occurrences of kmers.
self.getFinalResultInfo(o_nb_occ_foreach_extract_in_seqs,nb_sequences)
#o_nb_occ_foreach_extract_in_seqs.tofile("res_kernel2", sep="|", format="%s")
max_nb_occ=getMaxNbOccPerSeq(o_nb_occ_foreach_extract_in_seqs,nb_extracts,nb_sequences)
t3=time.time()
print "took ", t3 - t2, " seconds for kernel2"
host_read_occ_positions=self.getOccPositions(max_nb_occ,nb_sequences,nb_extracts)
t4=time.time()
print "took ", t4 - t3, " seconds for kernel3"
#host_read_occ_positions.tofile("res_kernel3", sep="|", format="%s")
# Gets reduced version of output_reads_nb_occ_in_seqs that enables me to iterate only on useful elements
# of host_read_occ_positions which is much faster than iterating over a big array.
reduced_nb_occ_foreach_extract_in_seqs=self.getReducedKmerNbOcc(max_nb_occ, nb_sequences)
#reduced_nb_occ_foreach_extract_in_seqs.tofile("res_kernel4", sep="|", format="%s")
t5=time.time()
print "took ", t5 - t4, " seconds for kernel4"
self.mapping_res=gpuBasicMappingResults(reduced_nb_occ_foreach_extract_in_seqs,host_read_occ_positions,int(max_nb_occ),nb_reads,nb_extracts,self.seq_data)
t6=time.time()
print "took ", t6 - t5, " seconds for getting mapping res"
return self.mapping_res