diff --git a/PhageTerm.py b/PhageTerm.py index fa1b44626f0f9c46d169e229ebeaf04ae58f703a..850149eff4ad2048b445731ab580707647a61f15 100755 --- a/PhageTerm.py +++ b/PhageTerm.py @@ -261,6 +261,11 @@ def main(): s_stats.R1, s_stats.R2, s_stats.R3, inRawDArgs.host, len(inDArgs.hostseq), host_whole_coverage, \ s_stats.picMaxPlus_host, s_stats.picMaxMinus_host, fParms.surrounding, s_stats.drop_cov, inRawDArgs.paired, insert, phage_hybrid_coverage,\ host_hybrid_coverage, s_stats.added_paired_whole_coverage, s_stats.Mu_like, fParms.test_run, s_stats.P_class, s_stats.P_type, s_stats.P_concat) + + if (inRawDArgs.nrt==True): # non regression tests, dump phage class name into file for later checking. + fnrt=open("nrt.txt","w") + fnrt.write(s_stats.P_class) + fnrt.close() else: # Test No Match if len(no_match) == inDArgs.nbr_virome: diff --git a/_modules/main_utils.py b/_modules/main_utils.py index d3ebd582bc673ec906ee002c3376c3ef1431c1cf..3fdfb5e0ae180c9ef294678efd804de5511a4d70 100755 --- a/_modules/main_utils.py +++ b/_modules/main_utils.py @@ -149,6 +149,8 @@ def setOptions(): opttest = OptionGroup(getopt, 'Perform a program test run upon installation') opttest.add_option('-t', '--test', dest='test', metavar='STRING', help='Perform a program test run upon installation. If you want to perform a test run, use the "-t " option. Arguments for the -t option can be : C5, C3, DS, DL, H or M. C5 -> Test run for a 5\' cohesive end (e.g. Lambda); C3 -> Test run for a 3\' cohesive end (e.g. HK97); DS -> Test run for a short Direct Terminal Repeats end (e.g. T7); DL -> Test run for a long Direct Terminal Repeats end (e.g. T5); H -> Test run for a Headful packaging (e.g. P1); M -> Test run for a Mu-like packaging (e.g. Mu)') + + opttest.add_option('--nrt',dest='nrt',action='store_true',default=False,help='dump phage Class name to special file for non regression testing') getopt.add_option_group(opttest) return getopt @@ -158,7 +160,7 @@ def setOptions(): # This class provides encapsulation for raw data provided by the user as arguments to phageterm (input file names, testing mode if so,phagename, host and paired). # It also performs checkings on the input files and computes the number of reads. class inputRawDataArgs: - def __init__(self,fastq,reference,host,phagename,paired,test): + def __init__(self,fastq,reference,host,phagename,paired,test,nrt): if test == "C5": print "\nPerforming a test run using test phage sequence with 5 prime cohesive overhang :" print "\npython PhageTerm.py -f test-data/COS-5.fastq -r test-data/COS-5.fasta -n TEST_cohesive_5_prime" @@ -223,6 +225,7 @@ class inputRawDataArgs: self.fastq=fastq self.paired=paired self.host=host + self.nrt=nrt # READS Number self.tot_reads = totReads(fastq) @@ -340,7 +343,11 @@ class technicalParms: self.dir_seq_mm=dir_seq_mm if core == None: self.core = 1 - self.limit_coverage = max(50, mean * 2) / self.core + print "mean=",mean + print "self.core=",self.core + self.limit_coverage = max(50, mean * 2) / float(self.core) + print self.limit_coverage + print type(self.limit_coverage) if gpu ==True and self.core > 1: print "Choose either multicore or gpu!" exit(1) @@ -471,7 +478,7 @@ def checkOptArgsConsistency(getopt): phagename = "Phagename" inRawDArgs = inputRawDataArgs(options.fastq, options.reference, options.host, options.phagename, options.paired, - options.test) + options.test,options.nrt) fParms = functionalParms(options.seed, options.surround, options.mean, options.limit, options.virome, options.test) # print "options.core,options.gpu,fParms.mean,options.gpu_mapping_res_dir,options.nb_chunks: " # print options.core,options.gpu,fParms.mean,options.gpu_mapping_res_dir,options.nb_chunks diff --git a/unit-tests/Data4Test.py b/unit-tests/Data4Test.py new file mode 100644 index 0000000000000000000000000000000000000000..3ab4de944f609d978406d9de9e209ecfbda191d9 --- /dev/null +++ b/unit-tests/Data4Test.py @@ -0,0 +1,48 @@ +import os + +from _modules.IData_handling import genomeFastaRecovery,totReads +from _modules.main_utils import inputRawDataArgs,functionalParms,InputDerivedDataArgs,technicalParms + +TEST_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) +TEST_DATA_PATH = os.path.join(TEST_DIR_PATH, "..", "test-data") + +class Data4Test: + def __init__(self,fastq,fasta,paired="",hostseq="",seed=20,edge=500,insert_max=1000,limit_coverage=11,virome=0): + self.inDRawArgs=inputRawDataArgs(fastq,fasta,None,None,paired,None,nrt=False) + #self.inDRawArgs.fastq=fastq + #return genome, name, genome_rejected + self.refseq_list,name,rej=genomeFastaRecovery(fasta, 500, edge, host_test = 0) + self.fParms=functionalParms(seed, None, None, limit_coverage, virome, None) + self.fParms.edge = edge + self.fParmsinsert_max = insert_max + self.inDArgs=InputDerivedDataArgs(self.inDRawArgs,self.fParms) + if hostseq!="": # expect name of fasta file containing hostseq + hs,name,rej=genomeFastaRecovery(fasta, 500, edge, host_test=0) + self.inDArgs.hostseq = hs[0] + else: + self.inDArgs.hostseq =hostseq + + self.tParms=technicalParms(1,None,250,None,None,None,None,None, \ + 0,None,False,None,None,0,"") + #self.core=1 + #self.core_id=0, + #self.limit_coverage=limit_coverage + #self.virome=virome + self.line_start=0 + self.line_end=totReads(fastq) + +def buildTestData(): + l=[] + l.append(Data4Test(data_path("COS-5.fastq"), data_path("COS-5.fasta"))) + l.append(Data4Test(data_path("COS-3.fastq"), data_path("COS-3.fasta"))) + l.append(Data4Test(data_path("DTR-short.fastq"), data_path("DTR-short.fasta"))) + l.append(Data4Test(data_path("DTR-long.fastq"), data_path("DTR-long.fasta"))) + l.append(Data4Test(data_path("Headful.fastq"), data_path("Headful.fasta"))) + l.append(Data4Test(data_path("Mu-like_R1.fastq"), data_path("Mu-like.fasta"), data_path("Mu-like_R2.fastq"), data_path("Mu-like_host.fasta"))) + l.append(Data4Test(data_path("Virome.fastq"), data_path("Virome.fasta"), virome=1)) + return l + +def data_path(filename): + '''append the test data path to the filename string. + ''' + return os.path.join(TEST_DATA_PATH, filename) diff --git a/unit-tests/chk_0_0_38_863.npz b/unit-tests/chk_0_0_38_863.npz new file mode 100644 index 0000000000000000000000000000000000000000..dd9cdfdc4840568b53d25387ffba006c034f41b5 Binary files /dev/null and b/unit-tests/chk_0_0_38_863.npz differ diff --git a/unit-tests/test_GPUMapper.py b/unit-tests/test_GPUMapper.py new file mode 100644 index 0000000000000000000000000000000000000000..784b189177cff484940a1b0801d46ecfb7959953 --- /dev/null +++ b/unit-tests/test_GPUMapper.py @@ -0,0 +1,255 @@ +##@file test_GPUMapper.py +# Tests of the GPU_chunkMapper + +import unittest +import os +os.environ['PYOPENCL_COMPILER_OUTPUT'] = '1' + +import numpy as np +import sys +sys.path.append("..") + +from _modules.GPU_chunkMapper import GPU_chunkMapper,computeChunkSize,doMapping +from _modules.IData_handling import refData,getChunk,readsExtractsS,dump_d_rinfo,load_d_rinfo +from _modules.GPU_mappingResults import getGPUChunkMappingResults + +from data_refseq import refseq_list +nb_sequences=len(refseq_list) +from data_refseq import all_extracts_l +nb_extracts=len(all_extracts_l) +print "nb_extracts=",nb_extracts + +ficdir="./tmp" + +## Class containing all the testing functions of the GPU_chunkMapper. +class TestGPUMapper (unittest.TestCase): + # def setUp(self): + # try: + # os.mkdir(ficdir) + # except: + # print "Couldn't create directory: ",ficdir,". Cannot run tests" + # exit(1) + # + # def teadDown(self): + # shutil.rmtree(ficdir) + + def test_computeChunkSize(self): + mapper=GPU_chunkMapper() + mapper.dev_mem==1536*1024*1024 # have to force this value otherwise test result depends on platform. + seq_data=refData(refseq_list,20,"") + nb_extracts=1000 + # let the mapper compute the necessary number of chunks + nb_chunks,max_chunk_size=computeChunkSize(mapper, seq_data, nb_extracts, "",wanted_nb_chunks=None) + self.assertEqual(nb_chunks,1) + self.assertEqual(max_chunk_size,nb_extracts) + # declare that we want to process the reads in at least 2 chunks. + nb_chunks, max_chunk_size = computeChunkSize(mapper, seq_data,nb_extracts, "",wanted_nb_chunks=2) + self.assertEqual(nb_chunks, 2) + self.assertEqual(max_chunk_size, (nb_extracts//2)) + nb_chunks, max_chunk_size = computeChunkSize(mapper, seq_data, nb_extracts, "", wanted_nb_chunks=3) + self.assertEqual(nb_chunks, 4) + self.assertEqual(max_chunk_size, 332) + + + + def test_getChunks(self): + cnt_chk=0 + for RE,d_rinfo in getChunk("../test-data/Virome.fastq", 20, "",67580): + cnt_chk+=1 + self.assertEqual(RE.nb_extracts,67580) + r_extracts_list=RE.r_extracts_list + self.assertEqual(cnt_chk,1) + cnt_chk = 0 + siz=4*1407 + for RE, d_rinfo in getChunk("../test-data/Virome.fastq", 20, "", 67580// 4//12): + if cnt_chk<12: + self.assertEqual(RE.nb_extracts,siz) + if cnt_chk == 0: + self.assertEqual(RE.r_extracts_list[0],r_extracts_list[0]) + self.assertEqual(RE.r_extracts_list[siz-1],r_extracts_list[siz-1]) + if cnt_chk == 3: + fic=os.path.join(ficdir,"d_rinfo3") + dump_d_rinfo(d_rinfo, fic) + re = d_rinfo[2] + d_rinfo2=load_d_rinfo(fic) + re2=d_rinfo2[2] + self.assertEqual(len(d_rinfo),len(d_rinfo2)) + self.assertEqual(re.offset1,re2.offset1) + self.assertEqual(re.offset2,re2.offset2) + self.assertEqual(re.corlen,re2.corlen) + self.assertEqual(RE.r_extracts_list[0],r_extracts_list[3*siz]) + self.assertEqual(RE.r_extracts_list[siz-1],r_extracts_list[4*siz-1]) + if cnt_chk == 7: + self.assertEqual(RE.r_extracts_list[0],r_extracts_list[7*siz] ) + self.assertEqual(RE.r_extracts_list[siz-1],r_extracts_list[8*siz-1]) + if cnt_chk == 11: + self.assertEqual(RE.r_extracts_list[0], r_extracts_list[11*siz]) + self.assertEqual(RE.r_extracts_list[siz-1],r_extracts_list[12*siz-1]) + if cnt_chk== 12: + self.assertEqual(RE.r_extracts_list[0], r_extracts_list[12 * siz ]) + self.assertEqual(RE.r_extracts_list[43],r_extracts_list[-1]) + cnt_chk += 1 + self.assertEqual(cnt_chk,13) + # for RE, d_rinfo in getChunk("../test-data/Virome.fastq", 20, "", 33790 // 4//13): + # self.assertEqual(RE.nb_extracts, + + + ## Testing that result files are generated where they should and that one can re-read them and check results. + # + def testMappingOnly(self): + rd=refData(refseq_list,20,"") + mapper=GPU_chunkMapper() + mapper.setRefData(rd) + mapper.setFicDir(ficdir) + nb_kmer_in_chunk=33788 + nb_kmer_per_read=4 + doMapping(nb_kmer_in_chunk,mapper,"../test-data/Virome.fastq","",rd,nb_kmer_per_read) + #mapper.setRExtracts(re) + #print all_extracts_l[33790] + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions0_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions0_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions0_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions1_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions1_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions1_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions2_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions2_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions2_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions3_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions3_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions3_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions4_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions4_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions4_2"))) + + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced0_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced0_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced0_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced1_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced1_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced1_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced2_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced2_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced2_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced3_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced3_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced3_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced4_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced4_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced4_2"))) + + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ0_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ0_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ0_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ1_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ1_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ1_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ2_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ2_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ2_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ3_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ3_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ3_2"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ4_0"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ4_1"))) + self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ4_2"))) + + i_occ_pos = np.loadtxt(os.path.join(ficdir, "read_occ_positions0_0"), dtype=np.int32, delimiter='|') + self.assertEqual(i_occ_pos[7677],1864) + self.assertEqual(i_occ_pos[7678],1954) + reduced=np.loadtxt(os.path.join(ficdir, "reduced0_0"), dtype=np.int64, delimiter='|') + self.assertEqual(reduced[15690], 11104) + self.assertEqual(reduced[15691], 1) + self.assertEqual(reduced[15692], 7677) + self.assertEqual(reduced[15693], 11105) + self.assertEqual(reduced[15694], 1) + self.assertEqual(reduced[15695], 7678) + + reduced = np.loadtxt(os.path.join(ficdir, "reduced3_1"), dtype=np.int64, delimiter='|') + i=0 + found1=False + found2=False + for x in np.nditer(reduced): + if (i%3==0): + if x==-1: + break + #print "found kmer: ",x," in seq 3" + if (x==26766): + found1=True + if (x==26767): + found2=True + if (found1 and found2): + break + i+=1 + + self.assertTrue(found1) + self.assertTrue(found2) + + f_nb_occ=open(os.path.join(ficdir,"max_occ0_0")) + str_nb_occ=f_nb_occ.read() + f_nb_occ.close() + max_nb_occ=int(str_nb_occ) + self.assertEqual(max_nb_occ,15690) + + + reduced = np.loadtxt(os.path.join(ficdir, "reduced2_1"), dtype=np.int64, delimiter='|') + found = False + i = 0 + nb_occ=0 + for x in np.nditer(reduced): + if (i%3==0): + if (x==2): + found=True + nb_occ=reduced[i+1] + i+=1 + self.assertTrue(found) + self.assertEqual(nb_occ,2) + + reduced = np.loadtxt(os.path.join(ficdir, "reduced2_1"), dtype=np.int64, delimiter='|') + found = False + i = 0 + nb_occ = 0 + for x in np.nditer(reduced): + if (i % 3 == 0): + if (x == 3): + found = True + nb_occ = reduced[i + 1] + i += 1 + self.assertTrue(found) + self.assertEqual(nb_occ, 2) + + + def testIterationOverMappingResults(self): + seed=20 + rd = refData(refseq_list, seed, "") + re = readsExtractsS(seed) + re.r_extracts_list = all_extracts_l + print all_extracts_l[60559] + print all_extracts_l[60560] # TODO: track mapping of these kmers in openCL and in result files. + print all_extracts_l[60561] + print all_extracts_l[60562] + print all_extracts_l[60563] # appears twice in seq 3. + re.nb_extracts = nb_extracts + re.nb_reads = 16895 + mapper = GPU_chunkMapper() + mapper.setRefData(rd) + mapper.setFicDir(ficdir) + doMapping(33788, mapper, "../test-data/Virome.fastq", "", rd, 4) + # mres=getGPUChunkMappingResults(os.path.join(ficdir, "reduced3_1"), os.path.join(ficdir, "read_occ_positions3_1"), + # os.path.join(ficdir,"max_occ3_1"), re.nb_reads, re.nb_extracts, rd, single=True) + mres=getGPUChunkMappingResults(3,1,ficdir,re.nb_reads,re.nb_extracts,rd,True) + found =False + for r, r_hostseq, idx_read in mres.mappingResults(): #TODO VL: add more tests. + self.assertTrue(idx_read>=6398) + if (idx_read==6693): # extracts from 60560 + self.assertEqual(r.read_start_match,(-1,-1)) + self.assertEqual(r.read_start_match,(-1,-1)) + self.assertEqual(r.rev_cpl_start_match,(-1,-1)) + self.assertTrue(r.rev_cpl_end_match==(448,468) or r.rev_cpl_end_match==(2449,2469)) + + + + + +if __name__ == "__main__": + unittest.main() diff --git a/unit-tests/test_OCL_Kernels.py b/unit-tests/test_OCL_Kernels.py new file mode 100644 index 0000000000000000000000000000000000000000..a5113ff8dad5afafe00eb4fcde3ebbdba630ed50 --- /dev/null +++ b/unit-tests/test_OCL_Kernels.py @@ -0,0 +1,341 @@ +## @file test_OCL_Kernels.py +# Small elementary tests for openCL kernels +# +# Unit testing of openCL kernels. + +import unittest +import os +os.environ['PYOPENCL_COMPILER_OUTPUT'] = '1' +# TODO VL: fix compilation error of kernels on CPU. See clGetProgramBuildInfo(program, device_id, CL_PROGRAM_BUILD_LOG, len, log, null); + +import numpy as np +import sys +sys.path.append("..") +from _modules.OCL_kernels import * + + +## Gets a list containing the sizes of all sequemces given in input. +# +# Sequences are reference sequences (genomes). +def getSeqSizesList(refseq_list): + seq_sizes_list = [] + for seq in refseq_list: + seq_sizes_list.append(len(seq)) + return seq_sizes_list + +## Class containing all the unit testing functions of the openCL kernels. +class TestMyOCLKernels(unittest.TestCase): + ## test the a kernel that copies an array into another + # + # This is just to do some basic checkings of the openCL implementation when switching to a new environment. + def test_copy(self): + read_list=["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", + "NCATGGTTTAGTACTTCTGT", \ + "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", + "CTTTCTTACTCAATCTAGTA"] + print "going to copy", len(read_list), " reads of length: 20" + ctx, queue, mf, max_dim1,dev_mem=getOpenCLElements() + input_reads = np.array(read_list) + nb_reads = len(read_list) + device_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_reads) + device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads)) + host_reads_copy = np.full(len(read_list) * 20, "X",np.str) + device_reads_copy = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_reads_copy) + + nb_threads_dim1 = getNbThreads(nb_reads, max_dim1) + + dummy_prg = cl.Program(ctx, copy_kernel_str).build() + dummy_prg.dummy(queue, (nb_threads_dim1,), (1,), device_reads, device_nb_reads, device_reads_copy) + cl.enqueue_copy(queue, host_reads_copy, device_reads_copy).wait() + str_reads_copy=''.join(host_reads_copy) + str_reads_orig=''.join(read_list) + self.assertEqual(str_reads_copy,str_reads_orig) + # print(host_reads_copy) + + ## tests a kernel that extracts the firts letter of each read passed as argument. + # + # This is just to do some basic checkings of the openCL implementation when switching to a new environment. + def test_first_letter(self): + print "going to get 1rst letter of each read" + read_list = ["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", + "NCATGGTTTAGTACTTCTGT", \ + "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", + "CTTTCTTACTCAATCTAGTA"] + ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() + input_reads = np.array(read_list) + nb_reads = len(read_list) + device_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_reads) + device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads)) + host_reads_firstL = np.full(len(read_list), 'X',np.str) + device_reads_firstL = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_reads_firstL) + + nb_threads_dim1 = getNbThreads(nb_reads, max_dim1) + # aim of this kernel is to try to determine how a numpy array created from a list of char string is treated inside an openCL kernel. + dummy_prg = cl.Program(ctx, first_letter_kernel_str).build() + + dummy_prg.dummy(queue, (nb_threads_dim1,), (1,), device_reads, device_nb_reads, device_reads_firstL) + cl.enqueue_copy(queue, host_reads_firstL, device_reads_firstL).wait() + expected_result=['N','N','N','N','N','N','T','A','N','C'] + self.assertItemsEqual(expected_result,host_reads_firstL) + # print(host_reads_firstL) + + ## tests the 1rts kernel really used by Phageterm. + # + # Here we want to get the number of occurrences of each read in each of the sequences. + # output format contains 2* nb_reads*nb_seq number. + # each 2*nb_reads line corresponds to 1 sequence + def test_reads_nb_occ(self): + seq_list = [ + "NAATTGCCTCTAAAATTGGAAACAATCTCTGAATTNNNNNNNTNNAAATTTTCTCCATACTCAATCCTCCAACTCTATNTTATGTTCTTCTGCGAAACTATCTTCTNCCTCTTCACAAAACTGACCTTCGCAAAGTGATNCNGNGAGTACTCTGCTAGTATAANACTCTCGGCAGCATAGCTTACAGATTTCATCTTCATAATTATTCCTTAACTCTTCTCTAGTCATTATTCACCCTCCTTTCTGACTAAATAGTCATACATAGGCTTGCAATTACTAAGANNNTTNTNACGTATCTTT", \ + "NCTGCTAAGTTCTTCAACTCAAAGGTAACTTTTGANNNNNNTGNNTCTATCACAGAAAGACAGGTAAAGGAATCTTTCAGCAGCGCAGAACAGTTGAACGGTTTCTTGTCTATGATTTATTCAGCAGTTGAAAAGTCAANGNCTATCAAGACCGATGCACTTGNTATGCGTACAATTAACAATATGATTGCGGAAACTTTGGACGCAGACAAAGCCGCTTTCGGTTGGGTTGCATCAACAAATGAAAATGTTGACTATGCGAGTGCGTCAACCGTTCGTTGTNNNAACCNGTTGAACATT"] + read_list = ["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", + "NCATGGTTTAGTACTTCTGT", \ + "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", + "CTTTCTTACTCAATCTAGTA"] + print "going to get the number of occurrences of each read in each of the sequences" + seed=20 + ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() + + seq_sizes_list=getSeqSizesList(seq_list) + input_reads = np.array(read_list) + input_seq=np.array(seq_list) + nb_reads = len(read_list) + nb_sequences = len(seq_list) + output_read_pos=np.full(2*len(read_list)*len(seq_list),0,np.int64) + + device_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_reads) + k = np.array(seed,np.uint32) # need to pass an arry of 1 element otherwise get an exception while trying to create buffer. + device_k = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=k) + device_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_seq) + device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads,np.int64)) + device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_sequences,dtype=np.uint32)) + device_seq_sizes = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(seq_sizes_list,dtype=np.uint32)) + device_read_pos = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=output_read_pos) + prg = cl.Program(ctx, reads_nb_occ_in_sequences_str).build() + nb_threads_dim1 = getNbThreads(nb_reads, max_dim1) + prg.reads_nb_occ(queue, (nb_threads_dim1,), (1,), device_k, device_reads, device_nb_reads, device_sequences, + device_seq_sizes, device_nb_sequences, device_read_pos) + cl.enqueue_copy(queue, output_read_pos, device_read_pos).wait() + #print(output_read_pos) + for i in range(0,40): + if ((i==0) or (i==12) or (i==22) or (i==34)): + self.assertEqual(output_read_pos[i],1) + else: + self.assertEqual(output_read_pos[i],0) + + ## Testing the 2nd Kernel used in PhageTerm + # + # This kernel updates the array produced by the 1rst one. + # See "doc/explanation of openCL Kernels for more detailed information on what this kernel does. + def test_upd_nb_occ_array(self): + ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() + seq_list = [ + "NAATTGCCTCTAAAATTGGAAACAATCTCTGAATTNNNNNNNTNNAAATTTTCTCCATACTCAATCCTCCAACTCTATNTTATGTTCTTCTGCGAAACTATCTTCTNCCTCTTCACAAAACTGACCTTCGCAAAGTGATNCNGNGAGTACTCTGCTAGTATAANACTCTCGGCAGCATAGCTTACAGATTTCATCTTCATAATTATTCCTTAACTCTTCTCTAGTCATTATTCACCCTCCTTTCTGACTAAATAGTCATACATAGGCTTGCAATTACTAAGANNNTTNTNACGTATCTTT", \ + "NCTGCTAAGTTCTTCAACTCAAAGGTAACTTTTGANNNNNNTGNNTCTATCACAGAAAGACAGGTAAAGGAATCTTTCAGCAGCGCAGAACAGTTGAACGGTTTCTTGTCTATGATTTATTCAGCAGTTGAAAAGTCAANGNCTATCAAGACCGATGCACTTGNTATGCGTACAATTAACAATATGATTGCGGAAACTTTGGACGCAGACAAAGCCGCTTTCGGTTGGGTTGCATCAACAAATGAAAATGTTGACTATGCGAGTGCGTCAACCGTTCGTTGTNNNAACCNGTTGAACATT"] + read_list = ["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", + "NCATGGTTTAGTACTTCTGT", \ + "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", + "CTTTCTTACTCAATCTAGTA"] + output_read_pos = np.full(2 * len(read_list) * len(seq_list), 0,np.int64) + output_read_pos[0]=1 + output_read_pos[12]=1 + output_read_pos[22]=1 + output_read_pos[34]=1 + device_read_pos = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=output_read_pos) + nb_sequences = len(seq_list) + device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_sequences, np.uint32)) + nb_reads = len(read_list) + device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads,np.int64)) + + # This kernel is intended to be used with 1 thread per sequence. + prg_update = cl.Program(ctx, upd_reads_nb_occ_in_sequences_str).build() + nb_threads_dim1 = getNbThreads(nb_sequences, max_dim1) + prg_update.upd_nb_occ_array(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences, + device_nb_reads) + cl.enqueue_copy(queue, output_read_pos, device_read_pos) + + host_max_nb_occ=getMaxNbOccPerSeq(output_read_pos,nb_reads,nb_sequences) + self.assertEqual(host_max_nb_occ,2) + expected_output_read_pos = np.full(2 * len(read_list) * len(seq_list), 0,np.int64) + expected_output_read_pos[0] = 1 + expected_output_read_pos[12] = 1 + expected_output_read_pos[22] = 1 + expected_output_read_pos[34] = 1 + expected_output_read_pos[3] = 1 + expected_output_read_pos[5] = 1 + expected_output_read_pos[7] = 1 + expected_output_read_pos[9] = 1 + expected_output_read_pos[11] = 1 + expected_output_read_pos[13] = 1 + expected_output_read_pos[15] = 2 + expected_output_read_pos[17] = 2 + expected_output_read_pos[19] = 2 + expected_output_read_pos[25] = 1 + expected_output_read_pos[27] = 1 + expected_output_read_pos[29] = 1 + expected_output_read_pos[31] = 1 + expected_output_read_pos[33] = 1 + expected_output_read_pos[35] = 1 + expected_output_read_pos[37] = 2 + expected_output_read_pos[39] = 2 + self.assertTrue(np.array_equal(expected_output_read_pos,output_read_pos)) + #print output_read_pos + #print host_max_nb_occ + + ## Testing the 3rd kernel used in Phageterm. + # + # This kernel looks for all occurrences of all kmers in all sequences. + # See "doc/explanation of openCL Kernels for more detailed information on what this kernel does. + def test_get_occ_pos(self): + seq_list = [ + "NAATTGCCTCTAAAATTGGAAACAATCTCTGAATTNNNNNNNTNNAAATTTTCTCCATACTCAATCCTCCAACTCTATNTTATGTTCTTCTGCGAAACTATCTTCTNCCTCTTCACAAAACTGACCTTCGCAAAGTGATNCNGNGAGTACTCTGCTAGTATAANACTCTCGGCAGCATAGCTTACAGATTTCATCTTCATAATTATTCCTTAACTCTTCTCTAGTCATTATTCACCCTCCTTTCTGACTAAATAGTCATACATAGGCTTGCAATTACTAAGANNNTTNTNACGTATCTTT", \ + "NCTGCTAAGTTCTTCAACTCAAAGGTAACTTTTGANNNNNNTGNNTCTATCACAGAAAGACAGGTAAAGGAATCTTTCAGCAGCGCAGAACAGTTGAACGGTTTCTTGTCTATGATTTATTCAGCAGTTGAAAAGTCAANGNCTATCAAGACCGATGCACTTGNTATGCGTACAATTAACAATATGATTGCGGAAACTTTGGACGCAGACAAAGCCGCTTTCGGTTGGGTTGCATCAACAAATGAAAATGTTGACTATGCGAGTGCGTCAACCGTTCGTTGTNNNAACCNGTTGAACATT"] + read_list = ["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", + "NCATGGTTTAGTACTTCTGT", \ + "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", + "CTTTCTTACTCAATCTAGTA"] + seq_sizes_list = getSeqSizesList(seq_list) + ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() + seed = 20 + output_read_pos = np.full(2 * len(read_list) * len(seq_list), 0,np.int64) + output_read_pos[0] = 1 + output_read_pos[12] = 1 + output_read_pos[22] = 1 + output_read_pos[34] = 1 + output_read_pos[3] = 1 + output_read_pos[5] = 1 + output_read_pos[7] = 1 + output_read_pos[9] = 1 + output_read_pos[11] = 1 + output_read_pos[13] = 1 + output_read_pos[15] = 2 + output_read_pos[17] = 2 + output_read_pos[19] = 2 + output_read_pos[25] = 1 + output_read_pos[27] = 1 + output_read_pos[29] = 1 + output_read_pos[31] = 1 + output_read_pos[33] = 1 + output_read_pos[35] = 1 + output_read_pos[37] = 2 + output_read_pos[39] = 2 + device_read_pos = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=output_read_pos) + nb_sequences = len(seq_list) + device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_sequences,np.uint32)) + nb_reads = len(read_list) + device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads)) + max_nb_occ = 2 + host_max_nb_occ = np.array(max_nb_occ) + + + device_max_nb_occ = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=host_max_nb_occ) + device_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(read_list)) + k = np.array(seed) # need to pass an arry of 1 element otherwise get an exception while trying to create buffer. + device_k = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=k) + device_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(seq_list)) + + device_seq_sizes = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(seq_sizes_list,dtype=np.uint32)) + host_read_occ_positions = np.full(host_max_nb_occ * len(seq_list), 0,np.uint32) + device_read_occ_positions = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_read_occ_positions) + + + + + prg_get_occ_pos = cl.Program(ctx, reads_occ_pos_in_seqs_str).build() + nb_threads_dim1 = getNbThreads(nb_reads, max_dim1) + prg_get_occ_pos.get_occ_pos(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences, + device_nb_reads, device_max_nb_occ, device_read_occ_positions, device_k, + device_reads, device_sequences, device_seq_sizes) + cl.enqueue_copy(queue, host_read_occ_positions, device_read_occ_positions) + print host_read_occ_positions + + self.assertEqual(host_read_occ_positions[0],0) + self.assertEqual(host_read_occ_positions[1], 53) + self.assertEqual(host_read_occ_positions[2], 0) + self.assertEqual(host_read_occ_positions[3], 57) + + ## Testing the 4rth openCL kernel used in phageterm + # + # This kernel produces the final results array. + # See "doc/explanation of openCL Kernels for more detailed information on what this kernel does. + def test_reads_nb_occ_reduction(self): + ctx, queue, mf, max_dim1,dev_mem = getOpenCLElements() + output_read_pos = np.full(2 * 10 * 2, 0,np.int64) + output_read_pos[0] = 1 + output_read_pos[12] = 1 + output_read_pos[22] = 1 + output_read_pos[34] = 1 + output_read_pos[3] = 1 + output_read_pos[5] = 1 + output_read_pos[7] = 1 + output_read_pos[9] = 1 + output_read_pos[11] = 1 + output_read_pos[13] = 1 + output_read_pos[15] = 2 + output_read_pos[17] = 2 + output_read_pos[19] = 2 + output_read_pos[25] = 1 + output_read_pos[27] = 1 + output_read_pos[29] = 1 + output_read_pos[31] = 1 + output_read_pos[33] = 1 + output_read_pos[35] = 1 + output_read_pos[37] = 2 + output_read_pos[39] = 2 + device_read_pos = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=output_read_pos) + nb_sequences = 2 + device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_sequences,np.uint32)) + nb_reads=10 + device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads,np.int64)) + + host_max_nb_occ = np.array(2, np.int64) + device_max_nb_occ = cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, hostbuf=host_max_nb_occ) + #device_found_read_carac_line_length=cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(3*nb_max_diff_read_occ)) + host_found_reads_carac = np.full(3 * 2 * nb_sequences, -1,np.int64) + device_found_reads_carac = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_found_reads_carac) + + # This kernel is intended to be used with 1 thread per sequence. + prg_reduce = cl.Program(ctx, reads_nb_occ_reduction_str).build() + nb_threads_dim1 = getNbThreads(nb_sequences, max_dim1) + #sf=np.array(0, np.int64) + #device_sf=cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, hostbuf=sf) + #prg_reduce.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences,device_nb_reads,device_found_reads_carac,device_max_nb_occ,device_sf) + #prg_reduce.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences, + # device_nb_reads, device_found_reads_carac, device_max_nb_occ, device_sf) + prg_reduce.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences, + device_nb_reads, device_found_reads_carac, device_max_nb_occ) + + cl.enqueue_copy(queue, host_found_reads_carac, device_found_reads_carac) + print host_found_reads_carac + self.assertEqual(host_found_reads_carac[0], 0) + self.assertEqual(host_found_reads_carac[1], 1) + self.assertEqual(host_found_reads_carac[2], 0) + self.assertEqual(host_found_reads_carac[3], 6) + self.assertEqual(host_found_reads_carac[4], 1) + self.assertEqual(host_found_reads_carac[5], 1) + + self.assertEqual(host_found_reads_carac[6], 1) + self.assertEqual(host_found_reads_carac[7], 1) + self.assertEqual(host_found_reads_carac[8], 0) + + self.assertEqual(host_found_reads_carac[9], 7) + self.assertEqual(host_found_reads_carac[10], 1) + self.assertEqual(host_found_reads_carac[11], 1) + + + +if __name__ == "__main__": + unittest.main() + + + + + + + + diff --git a/unit-tests/test_OCL_Kernels_bigger.py b/unit-tests/test_OCL_Kernels_bigger.py new file mode 100644 index 0000000000000000000000000000000000000000..7d3713d23ec11ba6d3287d76b70b114654789f7b --- /dev/null +++ b/unit-tests/test_OCL_Kernels_bigger.py @@ -0,0 +1,246 @@ +##@file test_OCL_Kernels_bigger.py +# Tests for openCL kernels on bigger datasets (more "real life like") + +import unittest + +import numpy as np +import sys +#print sys.path +sys.path.append("..") + +from _modules.OCL_kernels import * + +from data_refseq import refseq_list +nb_sequences=len(refseq_list) +from data_refseq import all_extracts_l +nb_extracts=len(all_extracts_l) +print nb_extracts +extracts_l=all_extracts_l + +## Class containing all the testing functions of the openCL kernels for bigger datasets than simple unit tests. +class TestMyOCLKernels (unittest.TestCase): + ## Testing the 1rts kernel used in Phageterm. + # + # The one that counts the number of occurrences of each kmers in each sequence. + def test_get_nb_occ(self): + ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() + # prepare input and output parameters on host side. + seq_sizes_l = [] # list of the length of all sequences + for s in refseq_list: + seq_sizes_l.append(len(s)) + str_seqs = "".join(refseq_list) # all contigs concatenated in a string + input_seqs = np.array(str_seqs) # Need to put all the sequences in a numpy array for openCL. + + input_rextracts = np.array(extracts_l) # all read extracts + o_nb_occ_foreach_extract_in_seqs = np.full(2 * nb_extracts * nb_sequences, 0,np.int64) # result array for kernel 1. + + # prepare input and output parameters on device size. + device_rextracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_rextracts) + k = np.array(20,np.uint32) # need to pass an array of 1 element otherwise get an exception while trying to create buffer. + device_k = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=k) + device_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_seqs) + device_nb_extracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_extracts, np.int64)) + device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_sequences, np.uint32)) + device_seq_sizes = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(seq_sizes_l, np.uint32)) + device_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, + hostbuf=o_nb_occ_foreach_extract_in_seqs) + prg = cl.Program(ctx, reads_nb_occ_in_sequences_str).build() + nb_threads_dim1 = getNbThreads(nb_extracts, max_dim1) + print "nb_threads_dim1=",nb_threads_dim1 + prg.reads_nb_occ(queue, (nb_threads_dim1,), (1,), device_k, device_rextracts, device_nb_extracts, + device_sequences, + device_seq_sizes, device_nb_sequences, device_nb_occ_foreach_extract_in_seqs) + # get kernel execution results back to host (number of occurrences of each k_mer in each sequence. + cl.enqueue_copy(queue, o_nb_occ_foreach_extract_in_seqs, device_nb_occ_foreach_extract_in_seqs).wait() + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11104],1) + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11104+1],0) + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11105],1) + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11105+1],0) + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11106],0) + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11106+1],0) + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11107],0) + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11107+1], 0) + array_l_siz=2*nb_extracts + for i in range (array_l_siz+2*11104,array_l_siz+2*11107+1): + self.assertEqual(o_nb_occ_foreach_extract_in_seqs[i],0) # None of these k-mers should appear in the 2nd sequence. + #np.savetxt("kmer_nb_occ.txt", o_nb_occ_foreach_extract_in_seqs, fmt="%d", delimiter=',') + + ## Testing the 2nd kernel used in Phageterm + def test_upd_nb_occ(self): + ctx, queue, mf, max_dim1,dev_mem = getOpenCLElements() + # nb_sequences = self.seq_data.nb_sequences # for readability. + # prepare input and output parameters on host side. + seq_sizes_l = [] # list of the length of all sequences + for s in refseq_list: + seq_sizes_l.append(len(s)) + str_seqs = "".join(refseq_list) # all contigs concatenated in a string + o_nb_occ_pos = np.loadtxt("kmer_nb_occ.txt", dtype=np.int64, delimiter=',') + # Just check that data is still here + self.assertEqual(o_nb_occ_pos[2 * 11104], 1) + self.assertEqual(o_nb_occ_pos[2 * 11104 + 1], 0) + self.assertEqual(o_nb_occ_pos[2 * 11105], 1) + self.assertEqual(o_nb_occ_pos[2 * 11105 + 1], 0) + device_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, + hostbuf=o_nb_occ_pos) + device_nb_extracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_extracts, np.int64)) + device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_sequences, np.uint32)) + # This kernel is intended to be used with 1 thread per sequence. + # It computes for each sequence, the total number of occurrences of kmers. + # It returns the max amongst all these sums. + prg_update = cl.Program(ctx, upd_reads_nb_occ_in_sequences_str).build() + nb_threads_dim1 = getNbThreads(nb_sequences, max_dim1) + print "updating occ array with ",nb_threads_dim1, "threads" + prg_update.upd_nb_occ_array(queue, (nb_threads_dim1,), (1,), device_nb_occ_foreach_extract_in_seqs, + device_nb_sequences, + device_nb_extracts) + #print "going to copy results" + cl.enqueue_copy(queue, o_nb_occ_pos, device_nb_occ_foreach_extract_in_seqs) + host_max_nb_occ = getMaxNbOccPerSeq(o_nb_occ_pos, nb_extracts, nb_sequences) # max over all sequences of the total number of read extracts occurrences per sequence + #print "results copied" + # o_nb_occ_foreach_extract_in_seqs.tofile("res_kernel2", sep="|", format="%s") + self.assertEqual(o_nb_occ_pos[2 * 11104], 1) + self.assertEqual(o_nb_occ_pos[2 * 11104 + 1], 7677) + self.assertEqual(o_nb_occ_pos[2 * 11105], 1) + self.assertEqual(o_nb_occ_pos[2 * 11105 + 1], 7678) + self.assertEqual(o_nb_occ_pos[2 * 11106], 0) + self.assertEqual(o_nb_occ_pos[2 * 11106 + 1], 7679) + self.assertEqual(o_nb_occ_pos[2 * 11107], 0) + self.assertEqual(o_nb_occ_pos[2 * 11107 + 1], 7679) + #np.savetxt("kmer_nb_occ_pos.txt",o_nb_occ_pos, fmt="%d", delimiter=',') + self.assertEqual(host_max_nb_occ, 16908) + + ## Testing 3rd kernel used in PhageTerm + def test_get_occ_pos(self): + ctx, queue, mf, max_dim1,dev_mem = getOpenCLElements() + # nb_sequences = self.seq_data.nb_sequences # for readability. + # prepare input and output parameters on host side. + seq_sizes_l = [] # list of the length of all sequences + for s in refseq_list: + seq_sizes_l.append(len(s)) + str_seqs = "".join(refseq_list) # all contigs concatenated in a string + input_seqs = np.array(str_seqs) # Need to put all the sequences in a numpy array for openCL. + input_rextracts = np.array(extracts_l) # all read extracts + # np.savetxt("kmer_nb_occ.txt", o_nb_occ_foreach_extract_in_seqs, fmt="%d", delimiter=',') + o_nb_occ_pos = np.loadtxt("kmer_nb_occ_pos.txt", dtype=np.int64, delimiter=',') + # Just check that data is still here + self.assertEqual(o_nb_occ_pos[2 * 11104], 1) + self.assertEqual(o_nb_occ_pos[2 * 11104 + 1], 7677) + self.assertEqual(o_nb_occ_pos[2 * 11105], 1) + self.assertEqual(o_nb_occ_pos[2 * 11105 + 1], 7678) + self.assertEqual(o_nb_occ_pos[2 * 11106], 0) + self.assertEqual(o_nb_occ_pos[2 * 11106 + 1], 7679) + self.assertEqual(o_nb_occ_pos[2 * 11107], 0) + self.assertEqual(o_nb_occ_pos[2 * 11107 + 1], 7679) + device_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, + hostbuf=o_nb_occ_pos) + k = np.array(20, np.uint32) + host_max_nb_occ = np.array(16908, + np.int64) # max over all sequences of the total number of read extracts occurrences per sequence + device_max_nb_occ = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_max_nb_occ) + + print "size of pos array for all sequences: ", int(host_max_nb_occ * nb_sequences) + host_read_occ_positions = np.full(int(host_max_nb_occ * nb_sequences), 0, + dtype=np.uint32) # contains the positions of all extracts occurrences in the sequences. + device_read_occ_positions = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_read_occ_positions) + device_k = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=k) + device_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_seqs) + device_nb_extracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_extracts, np.int64)) + device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_sequences, np.uint32)) + device_seq_sizes = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(seq_sizes_l, np.uint32)) + device_rextracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_rextracts) + prg_get_occ_pos = cl.Program(ctx, reads_occ_pos_in_seqs_str).build() + nb_threads_dim1 = getNbThreads(nb_extracts, max_dim1) + prg_get_occ_pos.get_occ_pos(queue, (nb_threads_dim1,), (1,), device_nb_occ_foreach_extract_in_seqs, + device_nb_sequences, + device_nb_extracts, device_max_nb_occ, device_read_occ_positions, device_k, + device_rextracts, device_sequences, device_seq_sizes) + cl.enqueue_copy(queue, host_read_occ_positions, device_read_occ_positions) + self.assertEqual(host_read_occ_positions[7677],1864) + self.assertEqual(host_read_occ_positions[7678],1954) + # Check that no kmer occurrence is above sequence length. Seems silly but I really wasted a lot of time on this bug! + for i in range(0,nb_sequences): + l=seq_sizes_l[i] + for j in range(0,host_max_nb_occ): + if (host_read_occ_positions[j]==1): + break + my_str="sequence " + my_str+=str(i) + my_str+=" Occ: " + my_str+=str(i) + my_str+=" l=" + my_str+=str(l) + my_str+=" pos=" + my_str+=str(host_read_occ_positions[i*host_max_nb_occ+j]) + self.assertTrue(host_read_occ_positions[i*host_max_nb_occ+j]<=l-20,my_str) + + # np.savetxt("kmer_list_occ_pos.txt", o_nb_occ_pos, fmt="%d", delimiter=',') + + ## Testing 4rth kernel used in phageterm. + def test_get_final_array(self): + ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() + i_nb_occ_pos = np.loadtxt("kmer_nb_occ_pos.txt", dtype=np.int64, delimiter=',') + # Checking that data is still here + self.assertEqual(i_nb_occ_pos[2 * 11104], 1) + self.assertEqual(i_nb_occ_pos[2 * 11104 + 1], 7677) + self.assertEqual(i_nb_occ_pos[2 * 11105], 1) + self.assertEqual(i_nb_occ_pos[2 * 11105 + 1], 7678) + self.assertEqual(i_nb_occ_pos[2 * 11106], 0) + self.assertEqual(i_nb_occ_pos[2 * 11106 + 1], 7679) + self.assertEqual(i_nb_occ_pos[2 * 11107], 0) + self.assertEqual(i_nb_occ_pos[2 * 11107 + 1], 7679) + #i_l_occ_pos=np.loadtxt("kmer_list_occ_pos.txt",dtype=np.uint32,delimiter=',') + host_max_nb_occ = np.array(15690,np.int64) + reduced_nb_occ_foreach_extract_in_seqs = np.full(int(3 * host_max_nb_occ * nb_sequences), -1, dtype=np.int64) + device_reduced_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, + hostbuf=reduced_nb_occ_foreach_extract_in_seqs) + device_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, + hostbuf=i_nb_occ_pos) # TODO Maybe put this buffer in read write in doMapping. + device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_sequences, np.uint32)) + device_nb_extracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, + hostbuf=np.array(nb_extracts, np.int64)) + device_max_nb_occ = cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, hostbuf=host_max_nb_occ) # TODO Maybe put this buffer in read write in doMapping. + # sf = np.array(0, np.int64) + # device_sf = cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, hostbuf=sf) + prg_red_rd_occ = cl.Program(ctx, reads_nb_occ_reduction_str).build() + nb_threads_dim1 = getNbThreads(nb_sequences, max_dim1) + # prg_red_rd_occ.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_nb_occ_foreach_extract_in_seqs, + # device_nb_sequences, + # device_nb_extracts, device_reduced_nb_occ_foreach_extract_in_seqs, + # device_max_nb_occ,device_sf) + prg_red_rd_occ.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_nb_occ_foreach_extract_in_seqs, + device_nb_sequences, + device_nb_extracts, device_reduced_nb_occ_foreach_extract_in_seqs, + device_max_nb_occ) + cl.enqueue_copy(queue, reduced_nb_occ_foreach_extract_in_seqs, device_reduced_nb_occ_foreach_extract_in_seqs) + # Checking results for a few kmers. TODO: add more checkings later. + self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15690],11104) + self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15691],1) + self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15692],7677) + self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15693], 11105) + self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15694], 1) + self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15695], 7678) + + # idx=0 + # for e in np.nditer(reduced_nb_occ_foreach_extract_in_seqs): + # if (idx % 3)==0: + # if e>=33791: + # print "found kmer with id:",e, idx + # idx+=1 + + + + + +if __name__ == "__main__": + unittest.main() + + diff --git a/unit-tests/test_functions_PhageTerm_for_GPU.py b/unit-tests/test_functions_PhageTerm_for_GPU.py new file mode 100644 index 0000000000000000000000000000000000000000..296b5d15e656ffeac3c8d4aaa64dac0ac84cbd6e --- /dev/null +++ b/unit-tests/test_functions_PhageTerm_for_GPU.py @@ -0,0 +1,118 @@ +import unittest +import numpy as np + +##@file test_functions_PhageTerm_for_GPU.py +# +# Need to be able to compare results of mappings done on CPU with those performed on GPU. +# For that, use test data that were already provided with previous phageterm version. +##@author vlegrand@pasteur.fr + +import sys +sys.path.append("..") + +from _modules.GPU_allMapper import GPU_allMapper +from _modules.functions_PhageTerm import readsCoverage +from _modules.IData_handling import getAllReads,refData +from _modules.functions_PhageTerm_gpu import readsCoverageGPU +from _modules.debug_utils import ReadMappingInfoLogger +from _modules.IData_handling import ReadGetter +from Data4Test import buildTestData + + + +## Tests that mapping results produced by CPU and GPU version are the same. +# +class Test_functions_Phageterm (unittest.TestCase): + ## Auxilliary method that compares mapping results globally + def compareMappingRes(self,d,d_rinfo,return_dict,l_res,logger_cpu,logger_gpu): + phage_hybrid_coverage = return_dict[d.core_id][3] + host_hybrid_coverage = return_dict[d.core_id][4] + host_whole_coverage = return_dict[d.core_id][5] + list_hybrid = return_dict[d.core_id][6] + insert = return_dict[d.core_id][7].tolist() + paired_mismatch = return_dict[d.core_id][8] + reads_tested = return_dict[d.core_id][9] + + phage_hybrid_coverage_gpu = l_res[3] + host_hybrid_coverage_gpu = l_res[4] + host_whole_coverage_gpu = l_res[5] + list_hybrid_gpu = l_res[6] + insert_gpu = l_res[7].tolist() + paired_mismatch_gpu = l_res[8] + reads_tested_gpu = l_res[9] + + self.assertEqual(reads_tested,reads_tested_gpu-1) # TODO fix bug in CPU version that skips read 0. + self.assertEqual(reads_tested, d.reads_tested-1) + self.assertEqual(paired_mismatch,paired_mismatch_gpu) + self.assertEqual(paired_mismatch,0) + self.assertEqual(insert,insert_gpu) + self.assertEqual(insert,[]) + self.assertTrue(np.array_equal(list_hybrid,list_hybrid_gpu)) + self.assertTrue(np.array_equal(host_whole_coverage,host_whole_coverage_gpu)) + self.assertTrue(np.array_equal(host_hybrid_coverage,host_hybrid_coverage_gpu)) + self.assertTrue(np.array_equal(phage_hybrid_coverage,phage_hybrid_coverage_gpu)) + # self.assertTrue(np.array_equal(paired_whole_coverage,paired_whole_coverage_gpu)) # always different since the counters that are incremented depend on matchPlus_start which is chosen randomly. + #self.assertTrue(np.array_equal(whole_coverage,whole_coverage_gpu)) + # self.assertTrue(np.array_equal(termini_coverage,termini_coverage_gpu)) + r_getter = ReadGetter(d.fastq, d_rinfo) + self.compareMappingResByRead(logger_cpu, logger_gpu,r_getter) + + + ## Compares mapping results read by read. + def compareMappingResByRead(self, logger_cpu, logger_gpu,r_getter): + l_rmatch_info_cpu = logger_cpu.getMatchInfoList() + l_rmatch_info_gpu = logger_gpu.getMatchInfoList() + self.assertEqual(logger_cpu.cnt_read,logger_gpu.cnt_read-1) + for i in range(0,logger_cpu.cnt_read): + # The cpu version skips the 1st read in the fasta file. This is a bug but it doesn't have any impact on the final results. + # TODO (cpu version authors): fix this bug. + rmi_cpu=l_rmatch_info_cpu[i] + rmi_gpu=l_rmatch_info_gpu[i+1] + #print rmi_cpu.idx_read + read_cpu,rp = r_getter.getOneRead(rmi_cpu.idx_read) + #print read_cpu + read_gpu,rp=r_getter.getOneRead(rmi_gpu.idx_read) + #print read_gpu + + self.assertEqual(rmi_cpu.idx_read,rmi_gpu.idx_read) + self.assertEqual(rmi_cpu.map_start,rmi_gpu.map_start) + if (rmi_cpu.map_start==0): + self.assertEqual(rmi_cpu.map_rcpl_start,rmi_gpu.map_rcpl_start) + + ## Auxilliary method that performs mapping on GPU + def performGPUMapping(self,d): + RE, d_rinfo = getAllReads(d.fastq, d.seed, d.paired) + print len(RE.r_extracts_list) + print RE.r_extracts_list + refseq_liste = d.refseq_list + ref_data = refData(refseq_liste, d.seed, d.hostseq) + mapper = GPU_allMapper(RE, ref_data) + mapping_res = mapper.doMapping() + return mapping_res,d_rinfo + + + + + ## This function tests that both version of readsCoverage (CPU and GPU) return the same results. + def testReadsCoverageGPU_CPU(self): + l_data=buildTestData() + for d in l_data: + print "processing dataset: ", d.fastq + mapping_res, d_rinfo = self.performGPUMapping(d) + for refseq in d.refseq_list: + logger_gpu = ReadMappingInfoLogger() + return_dict = dict() + logger_cpu = ReadMappingInfoLogger() + #readsCoverage(fastq, refseq, hostseq, seed, edge, paired, insert_max, core, core_id, return_dict, line_start, line_end, limit_coverage, virome,idx_refseq=None, dir_cov_res=None,logger=None) + readsCoverage(d.fastq, refseq, d.hostseq, d.seed, d.edge, d.paired, d.insert_max,\ + d.core, d.core_id, return_dict,d.line_start, d.line_end, \ + d.limit_coverage, d.virome,None,None,logger_cpu) + + l_res = readsCoverageGPU(d.fastq, refseq, d.hostseq, mapping_res, d_rinfo, d.edge, d.paired, + d.insert_max,d.limit_coverage,d.virome, logger_gpu) + self.compareMappingRes( d, d_rinfo, return_dict, l_res, logger_cpu, logger_gpu) + + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/unit-tests/test_functions_PhageTerm_for_multi.py b/unit-tests/test_functions_PhageTerm_for_multi.py new file mode 100644 index 0000000000000000000000000000000000000000..3e98e48213166761f9f1701a4acdc392d8138897 --- /dev/null +++ b/unit-tests/test_functions_PhageTerm_for_multi.py @@ -0,0 +1,167 @@ +##@file test_functions_PhageTerm_for_GPU.py +# +# +# Check that readsCoverage can write its result to a file and that they can be read again to retrieve results. +##@author vlegrand@pasteur.fr + +import sys +sys.path.append("..") + +import unittest +import numpy as np +import os +import shutil + +from Data4Test import buildTestData, Data4Test +from _modules.functions_PhageTerm import readsCoverage +from _modules.readsCoverage_res import loadRCRes,RCRes,RCCheckpoint_handler +from _modules.debug_utils import ReadMappingInfoLogger + + + +class Test_functions_Phageterm (unittest.TestCase): + ## Test that readsCoverage results are saved in the appropriate structure and can be read again for later work. + def testDumpAndReadAgainRes(self): + l_data = buildTestData() + chk_handler = RCCheckpoint_handler(0,None,False) + dir_cov_res = os.path.join(os.getcwd(),"tmp") + os.mkdir(os.path.join(os.getcwd(),"tmp")) # TODO: when switching to python3, use the tempfile module + for d in l_data: + idx_seq = 0 + for refseq in d.refseq_list: + return_dict = dict() + d.tParms.dir_cov_mm =None + readsCoverage(d.inDRawArgs, refseq, d.inDArgs, d.fParms,return_dict, 0,d.line_start, d.line_end, \ + d.tParms, chk_handler,idx_seq,None) + r1=RCRes(return_dict[0][0],return_dict[0][1],return_dict[0][2],\ + return_dict[0][3],return_dict[0][4],return_dict[0][5],\ + return_dict[0][6],return_dict[0][7],return_dict[0][8],return_dict[0][9]) + r1.save(os.path.join(dir_cov_res,"r1")) + d.tParms.dir_cov_mm=dir_cov_res + # readsCoverage(inRawDArgs, refseq, inDArgs, fParms, return_dict, line_start, line_end, tParms, \ + # idx_refseq=None, logger=None) + readsCoverage(d.inDRawArgs, refseq,d.inDArgs , d.fParms, None,0, d.line_start, d.line_end, d.tParms,chk_handler,idx_seq,logger=None) + fname="coverage"+str(idx_seq)+"_0.npz" + fic_name=os.path.join(dir_cov_res,fname) + print "checking: ",fic_name + self.assertTrue(os.path.exists(fic_name)) + res=loadRCRes(fic_name) + self.assertEqual(res.termini_coverage.shape,return_dict[0][0].shape) # These dictionnaries cannot have exact same content because of random picking of kmer match positions in case th + self.assertEqual(res.whole_coverage.shape, return_dict[0][1].shape) + self.assertEqual(res.paired_whole_coverage.shape, return_dict[0][2].shape) + self.assertEqual(res.phage_hybrid_coverage.shape, return_dict[0][3].shape) + self.assertEqual(res.host_hybrid_coverage.shape, return_dict[0][4].shape) + self.assertEqual(res.host_whole_coverage.shape, return_dict[0][5].shape) + self.assertTrue(res.list_hybrid.shape, return_dict[0][6].shape) + self.assertEqual(res.reads_tested, return_dict[0][9]) + r1b=loadRCRes(os.path.join(dir_cov_res,"r1.npz")) + self.assertTrue(np.array_equal(r1b.termini_coverage,r1.termini_coverage)) + self.assertTrue(np.array_equal(r1b.whole_coverage,r1.whole_coverage)) + self.assertTrue(np.array_equal(r1b.paired_whole_coverage,r1.paired_whole_coverage)) + self.assertTrue(np.array_equal(r1b.phage_hybrid_coverage,r1.phage_hybrid_coverage)) + self.assertTrue(np.array_equal(r1b.host_hybrid_coverage,r1.host_hybrid_coverage)) + self.assertTrue(np.array_equal(r1b.host_whole_coverage,r1.host_whole_coverage)) + self.assertTrue(np.array_equal(r1b.list_hybrid,r1.list_hybrid)) + self.assertTrue(np.array_equal(r1b.insert,r1.insert)) + self.assertTrue(np.array_equal(r1b.paired_mismatch,r1.paired_mismatch)) + self.assertTrue(np.array_equal(r1b.reads_tested,r1.reads_tested)) + idx_seq += 1 + shutil.rmtree(dir_cov_res) + + ## Checks that checkpoints are created and that their content is correct. + def test_checkpoint_creation(self): + # last line is 11150 + d=Data4Test("../test-data/COS-5.fastq", "../test-data/COS-5.fasta") + dir_chk = os.path.join(os.getcwd(), "tmp2") + os.mkdir(dir_chk) + d.tParms.dir_chk=dir_chk + d.tParms.test_mode=True + return_dict = dict() + d.tParms.dir_cov_mm = None + chk_handler = RCCheckpoint_handler(d.tParms.chk_freq, d.tParms.dir_chk, d.tParms.test_mode) + readsCoverage(d.inDRawArgs, d.refseq_list[0], d.inDArgs, d.fParms, return_dict, 0, d.line_start, d.line_end, \ + d.tParms, chk_handler, 0, None) + fic_name = "chk_0_0_11150_291333.npz" + full_fname = os.path.join(dir_chk, fic_name) + self.assertTrue(os.path.exists(full_fname)) + list_f = os.listdir(dir_chk) + self.assertTrue(len(list_f)==1) + wr=chk_handler.load(0,0) + self.assertEqual(wr.read_match,291333) + self.assertEqual(wr.count_line,11150) + self.assertEqual(wr.interm_res.host_len,0) + self.assertEqual(wr.interm_res.gen_len,3012) + self.assertEqual(int(wr.interm_res.reads_tested),2787) # 2796? + shutil.rmtree(dir_chk) + + + ## Checks thst in production mode, all checkpoints are deleted at the end. + def test_checkpoint_end(self): + # last line is 11150 + d=Data4Test("../test-data/COS-5.fastq", "../test-data/COS-5.fasta") + dir_chk = os.path.join(os.getcwd(), "tmp3") + os.mkdir(dir_chk) + d.tParms.chk_freq=1 + d.tParms.dir_chk=dir_chk + d.tParms.test_mode=False + return_dict = dict() + d.tParms.dir_cov_mm = None + chk_handler = RCCheckpoint_handler(d.tParms.chk_freq, d.tParms.dir_chk, d.tParms.test_mode) + readsCoverage(d.inDRawArgs, d.refseq_list[0], d.inDArgs, d.fParms, return_dict, 0, d.line_start, d.line_end, \ + d.tParms, chk_handler, 0, None) + list_f = os.listdir(dir_chk) + self.assertTrue(len(list_f)==0) + shutil.rmtree(dir_chk) + + ## checks that readsCoverage restarts from ceckpoint and not from the beginning. + def test_restart_from_checkpoint(self): + d = Data4Test("../test-data/COS-5.fastq", "../test-data/COS-5.fasta") + dir_chk = os.path.join(os.getcwd(), "tmp4") + os.mkdir(dir_chk) + d.tParms.dir_chk = dir_chk + d.tParms.chk_freq=1 + d.tParms.test_mode = False + shutil.copy("chk_0_0_38_863.npz",dir_chk) + return_dict = dict() + d.tParms.dir_cov_mm = None + logger=ReadMappingInfoLogger() + chk_handler = RCCheckpoint_handler(d.tParms.chk_freq, d.tParms.dir_chk, d.tParms.test_mode) + idx_seq=chk_handler.getIdxSeq(0) + self.assertEqual(idx_seq,0) + readsCoverage(d.inDRawArgs, d.refseq_list[0], d.inDArgs, d.fParms, return_dict, 0,d.line_start, d.line_end, \ + d.tParms, chk_handler,0, logger) + wr=logger.rw_lst[0] + self.assertEqual(wr.read_match,863) + self.assertEqual(wr.count_line,38) + self.assertEqual(wr.interm_res.host_len,0) + self.assertEqual(wr.interm_res.gen_len,3012) + self.assertEqual(int(wr.interm_res.reads_tested),9) + shutil.rmtree(dir_chk) + + def test_restart_from_checkpoint2(self): + d=Data4Test("../test-data/Virome.fastq","../test-data/Virome.fasta") + dir_chk = os.path.join(os.getcwd(), "tmp5") + os.mkdir(dir_chk) + d.tParms.dir_chk = dir_chk + d.tParms.chk_freq = 5 + d.tParms.test_mode = False + shutil.copy("../test-data/chk_0_2_10_0.npz", dir_chk) + logger = None + chk_handler = RCCheckpoint_handler(d.tParms.chk_freq, d.tParms.dir_chk, d.tParms.test_mode) + idx_seq = chk_handler.getIdxSeq(0) + self.assertEqual(idx_seq, 2) + start=idx_seq + return_dict = dict() + for seq in d.refseq_list[start:]: + print "going to process sequence: ",idx_seq + print seq + readsCoverage(d.inDRawArgs, seq, d.inDArgs, d.fParms, return_dict, 0, d.line_start, d.line_end, \ + d.tParms, chk_handler, idx_seq, logger) + idx_seq+=1 + self.assertEqual(idx_seq,5) + shutil.rmtree(dir_chk) + + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file