diff --git a/tests/Data4Test.py b/tests/Data4Test.py deleted file mode 100644 index b3bbbbbaac2b9c04c918f25ea6e89ef5bdc24d4f..0000000000000000000000000000000000000000 --- a/tests/Data4Test.py +++ /dev/null @@ -1,52 +0,0 @@ -import os - -from _modules.IData_handling import genomeFastaRecovery,totReads -from _modules.main_utils import inputRawDataArgs,functionalParms,InputDerivedDataArgs,technicalParms - -TEST_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_PATH = os.path.join(TEST_DIR_PATH, "..", "test-data") - -class Data4Test: - def __init__(self,fastq,fasta,paired="",hostseq="",seed=20,edge=500,insert_max=1000,limit_coverage=11,virome=0): - self.inDRawArgs=inputRawDataArgs(fastq,fasta,None,None,paired,None) - #self.inDRawArgs.fastq=fastq - #return genome, name, genome_rejected - self.refseq_list,name,rej=genomeFastaRecovery(fasta, 500, edge, host_test = 0) - self.fParms=functionalParms(seed, None, None, limit_coverage, virome, None) - self.fParms.edge = edge - self.fParmsinsert_max = insert_max - self.inDArgs=InputDerivedDataArgs(self.inDRawArgs,self.fParms) - if hostseq!="": # expect name of fasta file containing hostseq - hs,name,rej=genomeFastaRecovery(fasta, 500, edge, host_test=0) - self.inDArgs.hostseq = hs[0] - else: - self.inDArgs.hostseq =hostseq - - self.tParms=technicalParms(1,None,250,None,None,None,None,None, \ - 0,None,False,None,None,0,"") - #self.core=1 - #self.core_id=0, - #self.limit_coverage=limit_coverage - #self.virome=virome - self.line_start=0 - self.line_end=totReads(fastq) - self.fastq = fastq - self.seed = seed - self.paired = paired - self.hostseq = hostseq - -def buildTestData(): - l=[] - l.append(Data4Test(data_path("COS-5.fastq"), data_path("COS-5.fasta"))) - l.append(Data4Test(data_path("COS-3.fastq"), data_path("COS-3.fasta"))) - l.append(Data4Test(data_path("DTR-short.fastq"), data_path("DTR-short.fasta"))) - l.append(Data4Test(data_path("DTR-long.fastq"), data_path("DTR-long.fasta"))) - l.append(Data4Test(data_path("Headful.fastq"), data_path("Headful.fasta"))) - l.append(Data4Test(data_path("Mu-like_R1.fastq"), data_path("Mu-like.fasta"), data_path("Mu-like_R2.fastq"), data_path("Mu-like_host.fasta"))) - l.append(Data4Test(data_path("Virome.fastq"), data_path("Virome.fasta"), virome=1)) - return l - -def data_path(filename): - '''append the test data path to the filename string. - ''' - return os.path.join(TEST_DATA_PATH, filename) diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index b62684dc155e39e3bd1c34e71fc84677585a0ae7..0000000000000000000000000000000000000000 --- a/tests/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -pass - diff --git a/tests/chk_0_0_38_863.npz b/tests/chk_0_0_38_863.npz deleted file mode 100644 index dd9cdfdc4840568b53d25387ffba006c034f41b5..0000000000000000000000000000000000000000 Binary files a/tests/chk_0_0_38_863.npz and /dev/null differ diff --git a/tests/test_GPUMapper.py b/tests/test_GPUMapper.py deleted file mode 100644 index dfaf829cbdadc6b53082e3a660791b27f65d5f6c..0000000000000000000000000000000000000000 --- a/tests/test_GPUMapper.py +++ /dev/null @@ -1,257 +0,0 @@ -##@file test_GPUMapper.py -# Tests of the GPU_chunkMapper -from __future__ import print_function - -import unittest -import os -import shutil -os.environ['PYOPENCL_COMPILER_OUTPUT'] = '1' - -import numpy as np -import sys -sys.path.append("..") - -from _modules.GPU_chunkMapper import GPU_chunkMapper,computeChunkSize,doMapping -from _modules.IData_handling import refData,getChunk,readsExtractsS,dump_d_rinfo,load_d_rinfo -from _modules.GPU_mappingResults import getGPUChunkMappingResults - -from data_refseq import refseq_list -nb_sequences=len(refseq_list) -from data_refseq import all_extracts_l -nb_extracts=len(all_extracts_l) -print("nb_extracts=",nb_extracts) - -ficdir="./tmp" - -## Class containing all the testing functions of the GPU_chunkMapper. -class TestGPUMapper (unittest.TestCase): - def setUp(self): - try: - os.mkdir(ficdir) - except: - print("Couldn't create directory: ",ficdir,". Cannot run tests") - exit(1) - - def tearDown(self): - shutil.rmtree(ficdir) - - def test_computeChunkSize(self): - mapper=GPU_chunkMapper() - mapper.dev_mem==1536*1024*1024 # have to force this value otherwise test result depends on platform. - seq_data=refData(refseq_list,20,"") - nb_extracts=1000 - # let the mapper compute the necessary number of chunks - nb_chunks,max_chunk_size=computeChunkSize(mapper, seq_data, nb_extracts, "",wanted_nb_chunks=None) - self.assertEqual(nb_chunks,1) - self.assertEqual(max_chunk_size,nb_extracts) - # declare that we want to process the reads in at least 2 chunks. - nb_chunks, max_chunk_size = computeChunkSize(mapper, seq_data,nb_extracts, "",wanted_nb_chunks=2) - self.assertEqual(nb_chunks, 2) - self.assertEqual(max_chunk_size, (nb_extracts//2)) - nb_chunks, max_chunk_size = computeChunkSize(mapper, seq_data, nb_extracts, "", wanted_nb_chunks=3) - self.assertEqual(nb_chunks, 4) - self.assertEqual(max_chunk_size, 332) - - - - def test_getChunks(self): - cnt_chk=0 - for RE,d_rinfo in getChunk("../test-data/Virome.fastq", 20, "",67580): - cnt_chk+=1 - self.assertEqual(RE.nb_extracts,67580) - r_extracts_list=RE.r_extracts_list - self.assertEqual(cnt_chk,1) - cnt_chk = 0 - siz=4*1407 - for RE, d_rinfo in getChunk("../test-data/Virome.fastq", 20, "", 67580// 4//12): - if cnt_chk<12: - self.assertEqual(RE.nb_extracts,siz) - if cnt_chk == 0: - self.assertEqual(RE.r_extracts_list[0],r_extracts_list[0]) - self.assertEqual(RE.r_extracts_list[siz-1],r_extracts_list[siz-1]) - if cnt_chk == 3: - fic=os.path.join(ficdir,"d_rinfo3") - dump_d_rinfo(d_rinfo, fic) - re = d_rinfo[2] - d_rinfo2=load_d_rinfo(fic) - re2=d_rinfo2[2] - self.assertEqual(len(d_rinfo),len(d_rinfo2)) - self.assertEqual(re.offset1,re2.offset1) - self.assertEqual(re.offset2,re2.offset2) - self.assertEqual(re.corlen,re2.corlen) - self.assertEqual(RE.r_extracts_list[0],r_extracts_list[3*siz]) - self.assertEqual(RE.r_extracts_list[siz-1],r_extracts_list[4*siz-1]) - if cnt_chk == 7: - self.assertEqual(RE.r_extracts_list[0],r_extracts_list[7*siz] ) - self.assertEqual(RE.r_extracts_list[siz-1],r_extracts_list[8*siz-1]) - if cnt_chk == 11: - self.assertEqual(RE.r_extracts_list[0], r_extracts_list[11*siz]) - self.assertEqual(RE.r_extracts_list[siz-1],r_extracts_list[12*siz-1]) - if cnt_chk== 12: - self.assertEqual(RE.r_extracts_list[0], r_extracts_list[12 * siz ]) - self.assertEqual(RE.r_extracts_list[43],r_extracts_list[-1]) - cnt_chk += 1 - self.assertEqual(cnt_chk,13) - # for RE, d_rinfo in getChunk("../test-data/Virome.fastq", 20, "", 33790 // 4//13): - # self.assertEqual(RE.nb_extracts, - - - ## Testing that result files are generated where they should and that one can re-read them and check results. - # - def testMappingOnly(self): - rd=refData(refseq_list,20,"") - mapper=GPU_chunkMapper() - mapper.setRefData(rd) - mapper.setFicDir(ficdir) - nb_kmer_in_chunk=33788 - nb_kmer_per_read=4 - doMapping(nb_kmer_in_chunk,mapper,"../test-data/Virome.fastq","",rd,nb_kmer_per_read) - #mapper.setRExtracts(re) - #print all_extracts_l[33790] - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions0_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions0_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions0_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions1_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions1_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions1_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions2_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions2_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions2_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions3_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions3_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions3_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions4_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions4_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "read_occ_positions4_2"))) - - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced0_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced0_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced0_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced1_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced1_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced1_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced2_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced2_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced2_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced3_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced3_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced3_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced4_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced4_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "reduced4_2"))) - - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ0_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ0_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ0_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ1_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ1_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ1_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ2_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ2_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ2_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ3_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ3_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ3_2"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ4_0"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ4_1"))) - self.assertTrue(os.path.isfile(os.path.join(ficdir, "max_occ4_2"))) - - i_occ_pos = np.loadtxt(os.path.join(ficdir, "read_occ_positions0_0"), dtype=np.int32, delimiter='|') - self.assertEqual(i_occ_pos[7677],1864) - self.assertEqual(i_occ_pos[7678],1954) - reduced=np.loadtxt(os.path.join(ficdir, "reduced0_0"), dtype=np.int64, delimiter='|') - self.assertEqual(reduced[15690], 11104) - self.assertEqual(reduced[15691], 1) - self.assertEqual(reduced[15692], 7677) - self.assertEqual(reduced[15693], 11105) - self.assertEqual(reduced[15694], 1) - self.assertEqual(reduced[15695], 7678) - - reduced = np.loadtxt(os.path.join(ficdir, "reduced3_1"), dtype=np.int64, delimiter='|') - i=0 - found1=False - found2=False - for x in np.nditer(reduced): - if (i%3==0): - if x==-1: - break - #print "found kmer: ",x," in seq 3" - if (x==26766): - found1=True - if (x==26767): - found2=True - if (found1 and found2): - break - i+=1 - - self.assertTrue(found1) - self.assertTrue(found2) - - f_nb_occ=open(os.path.join(ficdir,"max_occ0_0")) - str_nb_occ=f_nb_occ.read() - f_nb_occ.close() - max_nb_occ=int(str_nb_occ) - self.assertEqual(max_nb_occ,15690) - - - reduced = np.loadtxt(os.path.join(ficdir, "reduced2_1"), dtype=np.int64, delimiter='|') - found = False - i = 0 - nb_occ=0 - for x in np.nditer(reduced): - if (i%3==0): - if (x==2): - found=True - nb_occ=reduced[i+1] - i+=1 - self.assertTrue(found) - self.assertEqual(nb_occ,2) - - reduced = np.loadtxt(os.path.join(ficdir, "reduced2_1"), dtype=np.int64, delimiter='|') - found = False - i = 0 - nb_occ = 0 - for x in np.nditer(reduced): - if (i % 3 == 0): - if (x == 3): - found = True - nb_occ = reduced[i + 1] - i += 1 - self.assertTrue(found) - self.assertEqual(nb_occ, 2) - - - def testIterationOverMappingResults(self): - seed=20 - rd = refData(refseq_list, seed, "") - re = readsExtractsS(seed) - re.r_extracts_list = all_extracts_l - print(all_extracts_l[60559]) - print(all_extracts_l[60560]) # TODO: track mapping of these kmers in openCL and in result files. - print(all_extracts_l[60561]) - print(all_extracts_l[60562]) - print(all_extracts_l[60563]) # appears twice in seq 3. - re.nb_extracts = nb_extracts - re.nb_reads = 16895 - mapper = GPU_chunkMapper() - mapper.setRefData(rd) - mapper.setFicDir(ficdir) - doMapping(33788, mapper, "../test-data/Virome.fastq", "", rd, 4) - # mres=getGPUChunkMappingResults(os.path.join(ficdir, "reduced3_1"), os.path.join(ficdir, "read_occ_positions3_1"), - # os.path.join(ficdir,"max_occ3_1"), re.nb_reads, re.nb_extracts, rd, single=True) - mres=getGPUChunkMappingResults(3,1,ficdir,re.nb_reads,re.nb_extracts,rd,True) - found =False - for r, r_hostseq, idx_read in mres.mappingResults(): #TODO VL: add more tests. - self.assertTrue(idx_read>=6398) - if (idx_read==6693): # extracts from 60560 - self.assertEqual(r.read_start_match,(-1,-1)) - self.assertEqual(r.read_start_match,(-1,-1)) - self.assertEqual(r.rev_cpl_start_match,(-1,-1)) - self.assertTrue(r.rev_cpl_end_match==(448,468) or r.rev_cpl_end_match==(2449,2469)) - - - - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_OCL_Kernels.py b/tests/test_OCL_Kernels.py deleted file mode 100644 index fae8c6db3fbfdee843c3fed34d71a574587ae156..0000000000000000000000000000000000000000 --- a/tests/test_OCL_Kernels.py +++ /dev/null @@ -1,342 +0,0 @@ -## @file test_OCL_Kernels.py -# Small elementary tests for openCL kernels -# -# Unit testing of openCL kernels. -from __future__ import print_function - -import unittest -import os -os.environ['PYOPENCL_COMPILER_OUTPUT'] = '1' -# TODO VL: fix compilation error of kernels on CPU. See clGetProgramBuildInfo(program, device_id, CL_PROGRAM_BUILD_LOG, len, log, null); - -import numpy as np -import sys -sys.path.append("..") -from _modules.OCL_kernels import * - - -## Gets a list containing the sizes of all sequemces given in input. -# -# Sequences are reference sequences (genomes). -def getSeqSizesList(refseq_list): - seq_sizes_list = [] - for seq in refseq_list: - seq_sizes_list.append(len(seq)) - return seq_sizes_list - -## Class containing all the unit testing functions of the openCL kernels. -class TestMyOCLKernels(unittest.TestCase): - ## test the a kernel that copies an array into another - # - # This is just to do some basic checkings of the openCL implementation when switching to a new environment. - def test_copy(self): - read_list=["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", - "NCATGGTTTAGTACTTCTGT", \ - "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", - "CTTTCTTACTCAATCTAGTA"] - print("going to copy", len(read_list), " reads of length: 20") - ctx, queue, mf, max_dim1,dev_mem=getOpenCLElements() - input_reads = np.array(read_list) - nb_reads = len(read_list) - device_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_reads) - device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads)) - host_reads_copy = np.full(len(read_list) * 20, "X",np.str) - device_reads_copy = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_reads_copy) - - nb_threads_dim1 = getNbThreads(nb_reads, max_dim1) - - dummy_prg = cl.Program(ctx, copy_kernel_str).build() - dummy_prg.dummy(queue, (nb_threads_dim1,), (1,), device_reads, device_nb_reads, device_reads_copy) - cl.enqueue_copy(queue, host_reads_copy, device_reads_copy).wait() - str_reads_copy=''.join(host_reads_copy) - str_reads_orig=''.join(read_list) - self.assertEqual(str_reads_copy,str_reads_orig) - # print(host_reads_copy) - - ## tests a kernel that extracts the firts letter of each read passed as argument. - # - # This is just to do some basic checkings of the openCL implementation when switching to a new environment. - def test_first_letter(self): - print("going to get 1st letter of each read") - read_list = ["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", - "NCATGGTTTAGTACTTCTGT", \ - "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", - "CTTTCTTACTCAATCTAGTA"] - ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() - input_reads = np.array(read_list) - nb_reads = len(read_list) - device_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_reads) - device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads)) - host_reads_firstL = np.full(len(read_list), 'X',np.str) - device_reads_firstL = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_reads_firstL) - - nb_threads_dim1 = getNbThreads(nb_reads, max_dim1) - # aim of this kernel is to try to determine how a numpy array created from a list of char string is treated inside an openCL kernel. - dummy_prg = cl.Program(ctx, first_letter_kernel_str).build() - - dummy_prg.dummy(queue, (nb_threads_dim1,), (1,), device_reads, device_nb_reads, device_reads_firstL) - cl.enqueue_copy(queue, host_reads_firstL, device_reads_firstL).wait() - expected_result=['N','N','N','N','N','N','T','A','N','C'] - self.assertItemsEqual(expected_result,host_reads_firstL) - # print(host_reads_firstL) - - ## tests the 1rts kernel really used by Phageterm. - # - # Here we want to get the number of occurrences of each read in each of the sequences. - # output format contains 2* nb_reads*nb_seq number. - # each 2*nb_reads line corresponds to 1 sequence - def test_reads_nb_occ(self): - seq_list = [ - "NAATTGCCTCTAAAATTGGAAACAATCTCTGAATTNNNNNNNTNNAAATTTTCTCCATACTCAATCCTCCAACTCTATNTTATGTTCTTCTGCGAAACTATCTTCTNCCTCTTCACAAAACTGACCTTCGCAAAGTGATNCNGNGAGTACTCTGCTAGTATAANACTCTCGGCAGCATAGCTTACAGATTTCATCTTCATAATTATTCCTTAACTCTTCTCTAGTCATTATTCACCCTCCTTTCTGACTAAATAGTCATACATAGGCTTGCAATTACTAAGANNNTTNTNACGTATCTTT", \ - "NCTGCTAAGTTCTTCAACTCAAAGGTAACTTTTGANNNNNNTGNNTCTATCACAGAAAGACAGGTAAAGGAATCTTTCAGCAGCGCAGAACAGTTGAACGGTTTCTTGTCTATGATTTATTCAGCAGTTGAAAAGTCAANGNCTATCAAGACCGATGCACTTGNTATGCGTACAATTAACAATATGATTGCGGAAACTTTGGACGCAGACAAAGCCGCTTTCGGTTGGGTTGCATCAACAAATGAAAATGTTGACTATGCGAGTGCGTCAACCGTTCGTTGTNNNAACCNGTTGAACATT"] - read_list = ["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", - "NCATGGTTTAGTACTTCTGT", \ - "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", - "CTTTCTTACTCAATCTAGTA"] - print("going to get the number of occurrences of each read in each of the sequences") - seed=20 - ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() - - seq_sizes_list=getSeqSizesList(seq_list) - input_reads = np.array(read_list) - input_seq=np.array(seq_list) - nb_reads = len(read_list) - nb_sequences = len(seq_list) - output_read_pos=np.full(2*len(read_list)*len(seq_list),0,np.int64) - - device_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_reads) - k = np.array(seed,np.uint32) # need to pass an arry of 1 element otherwise get an exception while trying to create buffer. - device_k = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=k) - device_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_seq) - device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads,np.int64)) - device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_sequences,dtype=np.uint32)) - device_seq_sizes = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(seq_sizes_list,dtype=np.uint32)) - device_read_pos = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=output_read_pos) - prg = cl.Program(ctx, reads_nb_occ_in_sequences_str).build() - nb_threads_dim1 = getNbThreads(nb_reads, max_dim1) - prg.reads_nb_occ(queue, (nb_threads_dim1,), (1,), device_k, device_reads, device_nb_reads, device_sequences, - device_seq_sizes, device_nb_sequences, device_read_pos) - cl.enqueue_copy(queue, output_read_pos, device_read_pos).wait() - #print(output_read_pos) - for i in range(0,40): - if ((i==0) or (i==12) or (i==22) or (i==34)): - self.assertEqual(output_read_pos[i],1) - else: - self.assertEqual(output_read_pos[i],0) - - ## Testing the 2nd Kernel used in PhageTerm - # - # This kernel updates the array produced by the 1rst one. - # See "doc/explanation of openCL Kernels for more detailed information on what this kernel does. - def test_upd_nb_occ_array(self): - ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() - seq_list = [ - "NAATTGCCTCTAAAATTGGAAACAATCTCTGAATTNNNNNNNTNNAAATTTTCTCCATACTCAATCCTCCAACTCTATNTTATGTTCTTCTGCGAAACTATCTTCTNCCTCTTCACAAAACTGACCTTCGCAAAGTGATNCNGNGAGTACTCTGCTAGTATAANACTCTCGGCAGCATAGCTTACAGATTTCATCTTCATAATTATTCCTTAACTCTTCTCTAGTCATTATTCACCCTCCTTTCTGACTAAATAGTCATACATAGGCTTGCAATTACTAAGANNNTTNTNACGTATCTTT", \ - "NCTGCTAAGTTCTTCAACTCAAAGGTAACTTTTGANNNNNNTGNNTCTATCACAGAAAGACAGGTAAAGGAATCTTTCAGCAGCGCAGAACAGTTGAACGGTTTCTTGTCTATGATTTATTCAGCAGTTGAAAAGTCAANGNCTATCAAGACCGATGCACTTGNTATGCGTACAATTAACAATATGATTGCGGAAACTTTGGACGCAGACAAAGCCGCTTTCGGTTGGGTTGCATCAACAAATGAAAATGTTGACTATGCGAGTGCGTCAACCGTTCGTTGTNNNAACCNGTTGAACATT"] - read_list = ["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", - "NCATGGTTTAGTACTTCTGT", \ - "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", - "CTTTCTTACTCAATCTAGTA"] - output_read_pos = np.full(2 * len(read_list) * len(seq_list), 0,np.int64) - output_read_pos[0]=1 - output_read_pos[12]=1 - output_read_pos[22]=1 - output_read_pos[34]=1 - device_read_pos = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=output_read_pos) - nb_sequences = len(seq_list) - device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_sequences, np.uint32)) - nb_reads = len(read_list) - device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads,np.int64)) - - # This kernel is intended to be used with 1 thread per sequence. - prg_update = cl.Program(ctx, upd_reads_nb_occ_in_sequences_str).build() - nb_threads_dim1 = getNbThreads(nb_sequences, max_dim1) - prg_update.upd_nb_occ_array(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences, - device_nb_reads) - cl.enqueue_copy(queue, output_read_pos, device_read_pos) - - host_max_nb_occ=getMaxNbOccPerSeq(output_read_pos,nb_reads,nb_sequences) - self.assertEqual(host_max_nb_occ,2) - expected_output_read_pos = np.full(2 * len(read_list) * len(seq_list), 0,np.int64) - expected_output_read_pos[0] = 1 - expected_output_read_pos[12] = 1 - expected_output_read_pos[22] = 1 - expected_output_read_pos[34] = 1 - expected_output_read_pos[3] = 1 - expected_output_read_pos[5] = 1 - expected_output_read_pos[7] = 1 - expected_output_read_pos[9] = 1 - expected_output_read_pos[11] = 1 - expected_output_read_pos[13] = 1 - expected_output_read_pos[15] = 2 - expected_output_read_pos[17] = 2 - expected_output_read_pos[19] = 2 - expected_output_read_pos[25] = 1 - expected_output_read_pos[27] = 1 - expected_output_read_pos[29] = 1 - expected_output_read_pos[31] = 1 - expected_output_read_pos[33] = 1 - expected_output_read_pos[35] = 1 - expected_output_read_pos[37] = 2 - expected_output_read_pos[39] = 2 - self.assertTrue(np.array_equal(expected_output_read_pos,output_read_pos)) - #print output_read_pos - #print host_max_nb_occ - - ## Testing the 3rd kernel used in Phageterm. - # - # This kernel looks for all occurrences of all kmers in all sequences. - # See "doc/explanation of openCL Kernels for more detailed information on what this kernel does. - def test_get_occ_pos(self): - seq_list = [ - "NAATTGCCTCTAAAATTGGAAACAATCTCTGAATTNNNNNNNTNNAAATTTTCTCCATACTCAATCCTCCAACTCTATNTTATGTTCTTCTGCGAAACTATCTTCTNCCTCTTCACAAAACTGACCTTCGCAAAGTGATNCNGNGAGTACTCTGCTAGTATAANACTCTCGGCAGCATAGCTTACAGATTTCATCTTCATAATTATTCCTTAACTCTTCTCTAGTCATTATTCACCCTCCTTTCTGACTAAATAGTCATACATAGGCTTGCAATTACTAAGANNNTTNTNACGTATCTTT", \ - "NCTGCTAAGTTCTTCAACTCAAAGGTAACTTTTGANNNNNNTGNNTCTATCACAGAAAGACAGGTAAAGGAATCTTTCAGCAGCGCAGAACAGTTGAACGGTTTCTTGTCTATGATTTATTCAGCAGTTGAAAAGTCAANGNCTATCAAGACCGATGCACTTGNTATGCGTACAATTAACAATATGATTGCGGAAACTTTGGACGCAGACAAAGCCGCTTTCGGTTGGGTTGCATCAACAAATGAAAATGTTGACTATGCGAGTGCGTCAACCGTTCGTTGTNNNAACCNGTTGAACATT"] - read_list = ["NAATTGCCTCTAAAATTGGA", "NCTGCTAAGTTCTTCAACTC", "NGACCTCGATATCATCGAGC", "NCACTTCATAGCGCACACCT", - "NCATGGTTTAGTACTTCTGT", \ - "NCACCAATTACATCAGCCAT", "TCCATACTCAATCCTCCAAC", "AGACAGGTAAAGGAATCTTT", "NNNACTANGTCATTTCAATA", - "CTTTCTTACTCAATCTAGTA"] - seq_sizes_list = getSeqSizesList(seq_list) - ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() - seed = 20 - output_read_pos = np.full(2 * len(read_list) * len(seq_list), 0,np.int64) - output_read_pos[0] = 1 - output_read_pos[12] = 1 - output_read_pos[22] = 1 - output_read_pos[34] = 1 - output_read_pos[3] = 1 - output_read_pos[5] = 1 - output_read_pos[7] = 1 - output_read_pos[9] = 1 - output_read_pos[11] = 1 - output_read_pos[13] = 1 - output_read_pos[15] = 2 - output_read_pos[17] = 2 - output_read_pos[19] = 2 - output_read_pos[25] = 1 - output_read_pos[27] = 1 - output_read_pos[29] = 1 - output_read_pos[31] = 1 - output_read_pos[33] = 1 - output_read_pos[35] = 1 - output_read_pos[37] = 2 - output_read_pos[39] = 2 - device_read_pos = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=output_read_pos) - nb_sequences = len(seq_list) - device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_sequences,np.uint32)) - nb_reads = len(read_list) - device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads)) - max_nb_occ = 2 - host_max_nb_occ = np.array(max_nb_occ) - - - device_max_nb_occ = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=host_max_nb_occ) - device_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(read_list)) - k = np.array(seed) # need to pass an arry of 1 element otherwise get an exception while trying to create buffer. - device_k = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=k) - device_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(seq_list)) - - device_seq_sizes = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(seq_sizes_list,dtype=np.uint32)) - host_read_occ_positions = np.full(host_max_nb_occ * len(seq_list), 0,np.uint32) - device_read_occ_positions = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_read_occ_positions) - - - - - prg_get_occ_pos = cl.Program(ctx, reads_occ_pos_in_seqs_str).build() - nb_threads_dim1 = getNbThreads(nb_reads, max_dim1) - prg_get_occ_pos.get_occ_pos(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences, - device_nb_reads, device_max_nb_occ, device_read_occ_positions, device_k, - device_reads, device_sequences, device_seq_sizes) - cl.enqueue_copy(queue, host_read_occ_positions, device_read_occ_positions) - print(host_read_occ_positions) - - self.assertEqual(host_read_occ_positions[0],0) - self.assertEqual(host_read_occ_positions[1], 53) - self.assertEqual(host_read_occ_positions[2], 0) - self.assertEqual(host_read_occ_positions[3], 57) - - ## Testing the 4rth openCL kernel used in phageterm - # - # This kernel produces the final results array. - # See "doc/explanation of openCL Kernels for more detailed information on what this kernel does. - def test_reads_nb_occ_reduction(self): - ctx, queue, mf, max_dim1,dev_mem = getOpenCLElements() - output_read_pos = np.full(2 * 10 * 2, 0,np.int64) - output_read_pos[0] = 1 - output_read_pos[12] = 1 - output_read_pos[22] = 1 - output_read_pos[34] = 1 - output_read_pos[3] = 1 - output_read_pos[5] = 1 - output_read_pos[7] = 1 - output_read_pos[9] = 1 - output_read_pos[11] = 1 - output_read_pos[13] = 1 - output_read_pos[15] = 2 - output_read_pos[17] = 2 - output_read_pos[19] = 2 - output_read_pos[25] = 1 - output_read_pos[27] = 1 - output_read_pos[29] = 1 - output_read_pos[31] = 1 - output_read_pos[33] = 1 - output_read_pos[35] = 1 - output_read_pos[37] = 2 - output_read_pos[39] = 2 - device_read_pos = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=output_read_pos) - nb_sequences = 2 - device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_sequences,np.uint32)) - nb_reads=10 - device_nb_reads = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(nb_reads,np.int64)) - - host_max_nb_occ = np.array(2, np.int64) - device_max_nb_occ = cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, hostbuf=host_max_nb_occ) - #device_found_read_carac_line_length=cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=np.array(3*nb_max_diff_read_occ)) - host_found_reads_carac = np.full(3 * 2 * nb_sequences, -1,np.int64) - device_found_reads_carac = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_found_reads_carac) - - # This kernel is intended to be used with 1 thread per sequence. - prg_reduce = cl.Program(ctx, reads_nb_occ_reduction_str).build() - nb_threads_dim1 = getNbThreads(nb_sequences, max_dim1) - #sf=np.array(0, np.int64) - #device_sf=cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, hostbuf=sf) - #prg_reduce.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences,device_nb_reads,device_found_reads_carac,device_max_nb_occ,device_sf) - #prg_reduce.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences, - # device_nb_reads, device_found_reads_carac, device_max_nb_occ, device_sf) - prg_reduce.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_read_pos, device_nb_sequences, - device_nb_reads, device_found_reads_carac, device_max_nb_occ) - - cl.enqueue_copy(queue, host_found_reads_carac, device_found_reads_carac) - print(host_found_reads_carac) - self.assertEqual(host_found_reads_carac[0], 0) - self.assertEqual(host_found_reads_carac[1], 1) - self.assertEqual(host_found_reads_carac[2], 0) - self.assertEqual(host_found_reads_carac[3], 6) - self.assertEqual(host_found_reads_carac[4], 1) - self.assertEqual(host_found_reads_carac[5], 1) - - self.assertEqual(host_found_reads_carac[6], 1) - self.assertEqual(host_found_reads_carac[7], 1) - self.assertEqual(host_found_reads_carac[8], 0) - - self.assertEqual(host_found_reads_carac[9], 7) - self.assertEqual(host_found_reads_carac[10], 1) - self.assertEqual(host_found_reads_carac[11], 1) - - - -if __name__ == "__main__": - unittest.main() - - - - - - - - diff --git a/tests/test_OCL_Kernels_bigger.py b/tests/test_OCL_Kernels_bigger.py deleted file mode 100644 index c4973dddc5fb4ba61a241352c2291d138c3a5a6f..0000000000000000000000000000000000000000 --- a/tests/test_OCL_Kernels_bigger.py +++ /dev/null @@ -1,247 +0,0 @@ -##@file test_OCL_Kernels_bigger.py -# Tests for openCL kernels on bigger datasets (more "real life like") -from __future__ import print_function - -import unittest - -import numpy as np -import sys -#print sys.path -sys.path.append("..") - -from _modules.OCL_kernels import * - -from data_refseq import refseq_list -nb_sequences=len(refseq_list) -from data_refseq import all_extracts_l -nb_extracts=len(all_extracts_l) -print(nb_extracts) -extracts_l=all_extracts_l - -## Class containing all the testing functions of the openCL kernels for bigger datasets than simple unit tests. -class TestMyOCLKernels (unittest.TestCase): - ## Testing the 1rts kernel used in Phageterm. - # - # The one that counts the number of occurrences of each kmers in each sequence. - def test_get_nb_occ(self): - ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() - # prepare input and output parameters on host side. - seq_sizes_l = [] # list of the length of all sequences - for s in refseq_list: - seq_sizes_l.append(len(s)) - str_seqs = "".join(refseq_list) # all contigs concatenated in a string - input_seqs = np.array(str_seqs) # Need to put all the sequences in a numpy array for openCL. - - input_rextracts = np.array(extracts_l) # all read extracts - o_nb_occ_foreach_extract_in_seqs = np.full(2 * nb_extracts * nb_sequences, 0,np.int64) # result array for kernel 1. - - # prepare input and output parameters on device size. - device_rextracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_rextracts) - k = np.array(20,np.uint32) # need to pass an array of 1 element otherwise get an exception while trying to create buffer. - device_k = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=k) - device_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_seqs) - device_nb_extracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_extracts, np.int64)) - device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_sequences, np.uint32)) - device_seq_sizes = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(seq_sizes_l, np.uint32)) - device_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, - hostbuf=o_nb_occ_foreach_extract_in_seqs) - prg = cl.Program(ctx, reads_nb_occ_in_sequences_str).build() - nb_threads_dim1 = getNbThreads(nb_extracts, max_dim1) - print("nb_threads_dim1=",nb_threads_dim1) - prg.reads_nb_occ(queue, (nb_threads_dim1,), (1,), device_k, device_rextracts, device_nb_extracts, - device_sequences, - device_seq_sizes, device_nb_sequences, device_nb_occ_foreach_extract_in_seqs) - # get kernel execution results back to host (number of occurrences of each k_mer in each sequence. - cl.enqueue_copy(queue, o_nb_occ_foreach_extract_in_seqs, device_nb_occ_foreach_extract_in_seqs).wait() - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11104],1) - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11104+1],0) - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11105],1) - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11105+1],0) - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11106],0) - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11106+1],0) - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11107],0) - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[2*11107+1], 0) - array_l_siz=2*nb_extracts - for i in range (array_l_siz+2*11104,array_l_siz+2*11107+1): - self.assertEqual(o_nb_occ_foreach_extract_in_seqs[i],0) # None of these k-mers should appear in the 2nd sequence. - #np.savetxt("kmer_nb_occ.txt", o_nb_occ_foreach_extract_in_seqs, fmt="%d", delimiter=',') - - ## Testing the 2nd kernel used in Phageterm - def test_upd_nb_occ(self): - ctx, queue, mf, max_dim1,dev_mem = getOpenCLElements() - # nb_sequences = self.seq_data.nb_sequences # for readability. - # prepare input and output parameters on host side. - seq_sizes_l = [] # list of the length of all sequences - for s in refseq_list: - seq_sizes_l.append(len(s)) - str_seqs = "".join(refseq_list) # all contigs concatenated in a string - o_nb_occ_pos = np.loadtxt("kmer_nb_occ.txt", dtype=np.int64, delimiter=',') - # Just check that data is still here - self.assertEqual(o_nb_occ_pos[2 * 11104], 1) - self.assertEqual(o_nb_occ_pos[2 * 11104 + 1], 0) - self.assertEqual(o_nb_occ_pos[2 * 11105], 1) - self.assertEqual(o_nb_occ_pos[2 * 11105 + 1], 0) - device_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, - hostbuf=o_nb_occ_pos) - device_nb_extracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_extracts, np.int64)) - device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_sequences, np.uint32)) - # This kernel is intended to be used with 1 thread per sequence. - # It computes for each sequence, the total number of occurrences of kmers. - # It returns the max amongst all these sums. - prg_update = cl.Program(ctx, upd_reads_nb_occ_in_sequences_str).build() - nb_threads_dim1 = getNbThreads(nb_sequences, max_dim1) - print("updating occ array with ",nb_threads_dim1, "threads") - prg_update.upd_nb_occ_array(queue, (nb_threads_dim1,), (1,), device_nb_occ_foreach_extract_in_seqs, - device_nb_sequences, - device_nb_extracts) - #print "going to copy results" - cl.enqueue_copy(queue, o_nb_occ_pos, device_nb_occ_foreach_extract_in_seqs) - host_max_nb_occ = getMaxNbOccPerSeq(o_nb_occ_pos, nb_extracts, nb_sequences) # max over all sequences of the total number of read extracts occurrences per sequence - #print "results copied" - # o_nb_occ_foreach_extract_in_seqs.tofile("res_kernel2", sep="|", format="%s") - self.assertEqual(o_nb_occ_pos[2 * 11104], 1) - self.assertEqual(o_nb_occ_pos[2 * 11104 + 1], 7677) - self.assertEqual(o_nb_occ_pos[2 * 11105], 1) - self.assertEqual(o_nb_occ_pos[2 * 11105 + 1], 7678) - self.assertEqual(o_nb_occ_pos[2 * 11106], 0) - self.assertEqual(o_nb_occ_pos[2 * 11106 + 1], 7679) - self.assertEqual(o_nb_occ_pos[2 * 11107], 0) - self.assertEqual(o_nb_occ_pos[2 * 11107 + 1], 7679) - #np.savetxt("kmer_nb_occ_pos.txt",o_nb_occ_pos, fmt="%d", delimiter=',') - self.assertEqual(host_max_nb_occ, 16908) - - ## Testing 3rd kernel used in PhageTerm - def test_get_occ_pos(self): - ctx, queue, mf, max_dim1,dev_mem = getOpenCLElements() - # nb_sequences = self.seq_data.nb_sequences # for readability. - # prepare input and output parameters on host side. - seq_sizes_l = [] # list of the length of all sequences - for s in refseq_list: - seq_sizes_l.append(len(s)) - str_seqs = "".join(refseq_list) # all contigs concatenated in a string - input_seqs = np.array(str_seqs) # Need to put all the sequences in a numpy array for openCL. - input_rextracts = np.array(extracts_l) # all read extracts - # np.savetxt("kmer_nb_occ.txt", o_nb_occ_foreach_extract_in_seqs, fmt="%d", delimiter=',') - o_nb_occ_pos = np.loadtxt("kmer_nb_occ_pos.txt", dtype=np.int64, delimiter=',') - # Just check that data is still here - self.assertEqual(o_nb_occ_pos[2 * 11104], 1) - self.assertEqual(o_nb_occ_pos[2 * 11104 + 1], 7677) - self.assertEqual(o_nb_occ_pos[2 * 11105], 1) - self.assertEqual(o_nb_occ_pos[2 * 11105 + 1], 7678) - self.assertEqual(o_nb_occ_pos[2 * 11106], 0) - self.assertEqual(o_nb_occ_pos[2 * 11106 + 1], 7679) - self.assertEqual(o_nb_occ_pos[2 * 11107], 0) - self.assertEqual(o_nb_occ_pos[2 * 11107 + 1], 7679) - device_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, - hostbuf=o_nb_occ_pos) - k = np.array(20, np.uint32) - host_max_nb_occ = np.array(16908, - np.int64) # max over all sequences of the total number of read extracts occurrences per sequence - device_max_nb_occ = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_max_nb_occ) - - print("size of pos array for all sequences: ", int(host_max_nb_occ * nb_sequences)) - host_read_occ_positions = np.full(int(host_max_nb_occ * nb_sequences), 0, - dtype=np.uint32) # contains the positions of all extracts occurrences in the sequences. - device_read_occ_positions = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, hostbuf=host_read_occ_positions) - device_k = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=k) - device_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_seqs) - device_nb_extracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_extracts, np.int64)) - device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_sequences, np.uint32)) - device_seq_sizes = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(seq_sizes_l, np.uint32)) - device_rextracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, hostbuf=input_rextracts) - prg_get_occ_pos = cl.Program(ctx, reads_occ_pos_in_seqs_str).build() - nb_threads_dim1 = getNbThreads(nb_extracts, max_dim1) - prg_get_occ_pos.get_occ_pos(queue, (nb_threads_dim1,), (1,), device_nb_occ_foreach_extract_in_seqs, - device_nb_sequences, - device_nb_extracts, device_max_nb_occ, device_read_occ_positions, device_k, - device_rextracts, device_sequences, device_seq_sizes) - cl.enqueue_copy(queue, host_read_occ_positions, device_read_occ_positions) - self.assertEqual(host_read_occ_positions[7677],1864) - self.assertEqual(host_read_occ_positions[7678],1954) - # Check that no kmer occurrence is above sequence length. Seems silly but I really wasted a lot of time on this bug! - for i in range(0,nb_sequences): - l=seq_sizes_l[i] - for j in range(0,host_max_nb_occ): - if (host_read_occ_positions[j]==1): - break - my_str="sequence " - my_str+=str(i) - my_str+=" Occ: " - my_str+=str(i) - my_str+=" l=" - my_str+=str(l) - my_str+=" pos=" - my_str+=str(host_read_occ_positions[i*host_max_nb_occ+j]) - self.assertTrue(host_read_occ_positions[i*host_max_nb_occ+j]<=l-20,my_str) - - # np.savetxt("kmer_list_occ_pos.txt", o_nb_occ_pos, fmt="%d", delimiter=',') - - ## Testing 4rth kernel used in phageterm. - def test_get_final_array(self): - ctx, queue, mf,max_dim1,dev_mem = getOpenCLElements() - i_nb_occ_pos = np.loadtxt("kmer_nb_occ_pos.txt", dtype=np.int64, delimiter=',') - # Checking that data is still here - self.assertEqual(i_nb_occ_pos[2 * 11104], 1) - self.assertEqual(i_nb_occ_pos[2 * 11104 + 1], 7677) - self.assertEqual(i_nb_occ_pos[2 * 11105], 1) - self.assertEqual(i_nb_occ_pos[2 * 11105 + 1], 7678) - self.assertEqual(i_nb_occ_pos[2 * 11106], 0) - self.assertEqual(i_nb_occ_pos[2 * 11106 + 1], 7679) - self.assertEqual(i_nb_occ_pos[2 * 11107], 0) - self.assertEqual(i_nb_occ_pos[2 * 11107 + 1], 7679) - #i_l_occ_pos=np.loadtxt("kmer_list_occ_pos.txt",dtype=np.uint32,delimiter=',') - host_max_nb_occ = np.array(15690,np.int64) - reduced_nb_occ_foreach_extract_in_seqs = np.full(int(3 * host_max_nb_occ * nb_sequences), -1, dtype=np.int64) - device_reduced_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.WRITE_ONLY | mf.USE_HOST_PTR, - hostbuf=reduced_nb_occ_foreach_extract_in_seqs) - device_nb_occ_foreach_extract_in_seqs = cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, - hostbuf=i_nb_occ_pos) # TODO Maybe put this buffer in read write in doMapping. - device_nb_sequences = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_sequences, np.uint32)) - device_nb_extracts = cl.Buffer(ctx, mf.READ_ONLY | mf.HOST_READ_ONLY | mf.USE_HOST_PTR, - hostbuf=np.array(nb_extracts, np.int64)) - device_max_nb_occ = cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, hostbuf=host_max_nb_occ) # TODO Maybe put this buffer in read write in doMapping. - # sf = np.array(0, np.int64) - # device_sf = cl.Buffer(ctx, mf.READ_ONLY | mf.USE_HOST_PTR, hostbuf=sf) - prg_red_rd_occ = cl.Program(ctx, reads_nb_occ_reduction_str).build() - nb_threads_dim1 = getNbThreads(nb_sequences, max_dim1) - # prg_red_rd_occ.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_nb_occ_foreach_extract_in_seqs, - # device_nb_sequences, - # device_nb_extracts, device_reduced_nb_occ_foreach_extract_in_seqs, - # device_max_nb_occ,device_sf) - prg_red_rd_occ.reads_nb_occ_reduction(queue, (nb_threads_dim1,), (1,), device_nb_occ_foreach_extract_in_seqs, - device_nb_sequences, - device_nb_extracts, device_reduced_nb_occ_foreach_extract_in_seqs, - device_max_nb_occ) - cl.enqueue_copy(queue, reduced_nb_occ_foreach_extract_in_seqs, device_reduced_nb_occ_foreach_extract_in_seqs) - # Checking results for a few kmers. TODO: add more checkings later. - self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15690],11104) - self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15691],1) - self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15692],7677) - self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15693], 11105) - self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15694], 1) - self.assertEqual(reduced_nb_occ_foreach_extract_in_seqs[15695], 7678) - - # idx=0 - # for e in np.nditer(reduced_nb_occ_foreach_extract_in_seqs): - # if (idx % 3)==0: - # if e>=33791: - # print "found kmer with id:",e, idx - # idx+=1 - - - - - -if __name__ == "__main__": - unittest.main() - - diff --git a/tests/test_functions_PhageTerm_for_GPU.py b/tests/test_functions_PhageTerm_for_GPU.py deleted file mode 100644 index 8c33009fae06fb4d9bd058b6aacc628ea275aa9c..0000000000000000000000000000000000000000 --- a/tests/test_functions_PhageTerm_for_GPU.py +++ /dev/null @@ -1,119 +0,0 @@ -from __future__ import print_function -import unittest -import numpy as np - -##@file test_functions_PhageTerm_for_GPU.py -# -# Need to be able to compare results of mappings done on CPU with those performed on GPU. -# For that, use test data that were already provided with previous phageterm version. -##@author vlegrand@pasteur.fr - -import sys -sys.path.append("..") - -from _modules.GPU_allMapper import GPU_allMapper -from _modules.functions_PhageTerm import readsCoverage -from _modules.IData_handling import getAllReads,refData -from _modules.functions_PhageTerm_gpu import readsCoverageGPU -from _modules.debug_utils import ReadMappingInfoLogger -from _modules.IData_handling import ReadGetter -from tests.Data4Test import buildTestData - - - -## Tests that mapping results produced by CPU and GPU version are the same. -# -class Test_functions_Phageterm (unittest.TestCase): - ## Auxilliary method that compares mapping results globally - def compareMappingRes(self,d,d_rinfo,return_dict,l_res,logger_cpu,logger_gpu): - phage_hybrid_coverage = return_dict[d.core_id][3] - host_hybrid_coverage = return_dict[d.core_id][4] - host_whole_coverage = return_dict[d.core_id][5] - list_hybrid = return_dict[d.core_id][6] - insert = return_dict[d.core_id][7].tolist() - paired_mismatch = return_dict[d.core_id][8] - reads_tested = return_dict[d.core_id][9] - - phage_hybrid_coverage_gpu = l_res[3] - host_hybrid_coverage_gpu = l_res[4] - host_whole_coverage_gpu = l_res[5] - list_hybrid_gpu = l_res[6] - insert_gpu = l_res[7].tolist() - paired_mismatch_gpu = l_res[8] - reads_tested_gpu = l_res[9] - - self.assertEqual(reads_tested,reads_tested_gpu-1) # TODO fix bug in CPU version that skips read 0. - self.assertEqual(reads_tested, d.reads_tested-1) - self.assertEqual(paired_mismatch,paired_mismatch_gpu) - self.assertEqual(paired_mismatch,0) - self.assertEqual(insert,insert_gpu) - self.assertEqual(insert,[]) - self.assertTrue(np.array_equal(list_hybrid,list_hybrid_gpu)) - self.assertTrue(np.array_equal(host_whole_coverage,host_whole_coverage_gpu)) - self.assertTrue(np.array_equal(host_hybrid_coverage,host_hybrid_coverage_gpu)) - self.assertTrue(np.array_equal(phage_hybrid_coverage,phage_hybrid_coverage_gpu)) - # self.assertTrue(np.array_equal(paired_whole_coverage,paired_whole_coverage_gpu)) # always different since the counters that are incremented depend on matchPlus_start which is chosen randomly. - #self.assertTrue(np.array_equal(whole_coverage,whole_coverage_gpu)) - # self.assertTrue(np.array_equal(termini_coverage,termini_coverage_gpu)) - r_getter = ReadGetter(d.fastq, d_rinfo) - self.compareMappingResByRead(logger_cpu, logger_gpu,r_getter) - - - ## Compares mapping results read by read. - def compareMappingResByRead(self, logger_cpu, logger_gpu,r_getter): - l_rmatch_info_cpu = logger_cpu.getMatchInfoList() - l_rmatch_info_gpu = logger_gpu.getMatchInfoList() - self.assertEqual(logger_cpu.cnt_read,logger_gpu.cnt_read-1) - for i in range(0,logger_cpu.cnt_read): - # The cpu version skips the 1st read in the fasta file. This is a bug but it doesn't have any impact on the final results. - # TODO (cpu version authors): fix this bug. - rmi_cpu=l_rmatch_info_cpu[i] - rmi_gpu=l_rmatch_info_gpu[i+1] - #print rmi_cpu.idx_read - read_cpu,rp = r_getter.getOneRead(rmi_cpu.idx_read) - #print read_cpu - read_gpu,rp=r_getter.getOneRead(rmi_gpu.idx_read) - #print read_gpu - - self.assertEqual(rmi_cpu.idx_read,rmi_gpu.idx_read) - self.assertEqual(rmi_cpu.map_start,rmi_gpu.map_start) - if (rmi_cpu.map_start==0): - self.assertEqual(rmi_cpu.map_rcpl_start,rmi_gpu.map_rcpl_start) - - ## Auxilliary method that performs mapping on GPU - def performGPUMapping(self,d): - RE, d_rinfo = getAllReads(d.fastq, d.seed, d.paired) - print(len(RE.r_extracts_list)) - print(RE.r_extracts_list) - refseq_liste = d.refseq_list - ref_data = refData(refseq_liste, d.seed, d.hostseq) - mapper = GPU_allMapper(RE, ref_data) - mapping_res = mapper.doMapping() - return mapping_res,d_rinfo - - - - - ## This function tests that both version of readsCoverage (CPU and GPU) return the same results. - def testReadsCoverageGPU_CPU(self): - l_data=buildTestData() - for d in l_data: - print("processing dataset: ", d.fastq) - mapping_res, d_rinfo = self.performGPUMapping(d) - for refseq in d.refseq_list: - logger_gpu = ReadMappingInfoLogger() - return_dict = dict() - logger_cpu = ReadMappingInfoLogger() - #readsCoverage(fastq, refseq, hostseq, seed, edge, paired, insert_max, core, core_id, return_dict, line_start, line_end, limit_coverage, virome,idx_refseq=None, dir_cov_res=None,logger=None) - readsCoverage(d.fastq, refseq, d.hostseq, d.seed, d.edge, d.paired, d.insert_max,\ - d.core, d.core_id, return_dict,d.line_start, d.line_end, \ - d.limit_coverage, d.virome,None,None,logger_cpu) - - l_res = readsCoverageGPU(d.fastq, refseq, d.hostseq, mapping_res, d_rinfo, d.edge, d.paired, - d.insert_max,d.limit_coverage,d.virome, logger_gpu) - self.compareMappingRes( d, d_rinfo, return_dict, l_res, logger_cpu, logger_gpu) - - - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/tests/test_functions_PhageTerm_for_multi.py b/tests/test_functions_PhageTerm_for_multi.py deleted file mode 100644 index 7415c011742b4285832e0f65f8e912eec6ec0845..0000000000000000000000000000000000000000 --- a/tests/test_functions_PhageTerm_for_multi.py +++ /dev/null @@ -1,165 +0,0 @@ -##@file test_functions_PhageTerm_for_GPU.py -# -# -# Check that readsCoverage can write its result to a file and that they can be read again to retrieve results. -##@author vlegrand@pasteur.fr -from __future__ import print_function - - -import unittest -import numpy as np -import os -import shutil - -from Data4Test import buildTestData, Data4Test, data_path -from _modules.functions_PhageTerm import readsCoverage -from _modules.readsCoverage_res import loadRCRes,RCRes,RCCheckpoint_handler -from _modules.debug_utils import ReadMappingInfoLogger - -TEST_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) - - -class Test_functions_Phageterm (unittest.TestCase): - - def setUp(self): - # TODO: when switching to python3, use the tempfile module - self.dir_chk = os.path.join(os.getcwd(), "tmp") - os.mkdir(self.dir_chk) - - def tearDown(self): - shutil.rmtree(self.dir_chk) - - ## Test that readsCoverage results are saved in the appropriate structure and can be read again for later work. - - ## Test that readsCoverage results are saved in the appropriate structure and can be read again for later work. - def testDumpAndReadAgainRes(self): - l_data = buildTestData() - chk_handler = RCCheckpoint_handler(0,None,False) - dir_cov_res = self.dir_chk - for d in l_data: - idx_seq = 0 - for refseq in d.refseq_list: - return_dict = dict() - d.tParms.dir_cov_mm =None - readsCoverage(d.inDRawArgs, refseq, d.inDArgs, d.fParms,return_dict, 0,d.line_start, d.line_end, - d.tParms, chk_handler,idx_seq,None) - r1=RCRes(return_dict[0][0],return_dict[0][1],return_dict[0][2],\ - return_dict[0][3],return_dict[0][4],return_dict[0][5],\ - return_dict[0][6],return_dict[0][7],return_dict[0][8],return_dict[0][9]) - r1.save(os.path.join(self.dir_chk,"r1")) - d.tParms.dir_cov_mm=self.dir_chk - # readsCoverage(inRawDArgs, refseq, inDArgs, fParms, return_dict, line_start, line_end, tParms, \ - # idx_refseq=None, logger=None) - readsCoverage(d.inDRawArgs, refseq,d.inDArgs , d.fParms, None,0, d.line_start, d.line_end, d.tParms,chk_handler,idx_seq,logger=None) - fname="coverage"+str(idx_seq)+"_0.npz" - fic_name=os.path.join(self.dir_chk,fname) - print("checking: ",fic_name) - self.assertTrue(os.path.exists(fic_name)) - res=loadRCRes(fic_name) - self.assertEqual(res.termini_coverage.shape,return_dict[0][0].shape) # These dictionnaries cannot have exact same content because of random picking of kmer match positions in case th - self.assertEqual(res.whole_coverage.shape, return_dict[0][1].shape) - self.assertEqual(res.paired_whole_coverage.shape, return_dict[0][2].shape) - self.assertEqual(res.phage_hybrid_coverage.shape, return_dict[0][3].shape) - self.assertEqual(res.host_hybrid_coverage.shape, return_dict[0][4].shape) - self.assertEqual(res.host_whole_coverage.shape, return_dict[0][5].shape) - self.assertTrue(res.list_hybrid.shape, return_dict[0][6].shape) - self.assertEqual(res.reads_tested, return_dict[0][9]) - r1b=loadRCRes(os.path.join(self.dir_chk,"r1.npz")) - self.assertTrue(np.array_equal(r1b.termini_coverage,r1.termini_coverage)) - self.assertTrue(np.array_equal(r1b.whole_coverage,r1.whole_coverage)) - self.assertTrue(np.array_equal(r1b.paired_whole_coverage,r1.paired_whole_coverage)) - self.assertTrue(np.array_equal(r1b.phage_hybrid_coverage,r1.phage_hybrid_coverage)) - self.assertTrue(np.array_equal(r1b.host_hybrid_coverage,r1.host_hybrid_coverage)) - self.assertTrue(np.array_equal(r1b.host_whole_coverage,r1.host_whole_coverage)) - self.assertTrue(np.array_equal(r1b.list_hybrid,r1.list_hybrid)) - self.assertTrue(np.array_equal(r1b.insert,r1.insert)) - self.assertTrue(np.array_equal(r1b.paired_mismatch,r1.paired_mismatch)) - self.assertTrue(np.array_equal(r1b.reads_tested,r1.reads_tested)) - idx_seq += 1 - - ## Checks that checkpoints are created and that their content is correct. - def test_checkpoint_creation(self): - # last line is 11150 - d=Data4Test(data_path("COS-5.fastq"), data_path("COS-5.fasta")) - d.tParms.dir_chk=self.dir_chk - d.tParms.test_mode=True - return_dict = dict() - d.tParms.dir_cov_mm = None - chk_handler = RCCheckpoint_handler(d.tParms.chk_freq, d.tParms.dir_chk, d.tParms.test_mode) - readsCoverage(d.inDRawArgs, d.refseq_list[0], d.inDArgs, d.fParms, return_dict, 0, d.line_start, d.line_end, \ - d.tParms, chk_handler, 0, None) - fic_name = "chk_0_0_11150_291333.npz" - full_fname = os.path.join(self.dir_chk, fic_name) - self.assertTrue(os.path.exists(full_fname)) - list_f = os.listdir(self.dir_chk) - self.assertTrue(len(list_f)==1) - wr=chk_handler.load(0,0) - self.assertEqual(wr.read_match,291333) - self.assertEqual(wr.count_line,11150) - self.assertEqual(wr.interm_res.host_len,0) - self.assertEqual(wr.interm_res.gen_len,3012) - self.assertEqual(int(wr.interm_res.reads_tested),2787) # 2796? - - - ## Checks thst in production mode, all checkpoints are deleted at the end. - def test_checkpoint_end(self): - # last line is 11150 - d=Data4Test(data_path("COS-5.fastq"), data_path("COS-5.fasta")) - d.tParms.chk_freq=1 - d.tParms.dir_chk=self.dir_chk - d.tParms.test_mode=False - return_dict = dict() - d.tParms.dir_cov_mm = None - chk_handler = RCCheckpoint_handler(d.tParms.chk_freq, d.tParms.dir_chk, d.tParms.test_mode) - readsCoverage(d.inDRawArgs, d.refseq_list[0], d.inDArgs, d.fParms, return_dict, 0, d.line_start, d.line_end, \ - d.tParms, chk_handler, 0, None) - list_f = os.listdir(self.dir_chk) - self.assertTrue(len(list_f)==0) - - ## checks that readsCoverage restarts from ceckpoint and not from the beginning. - def test_restart_from_checkpoint(self): - d=Data4Test(data_path("COS-5.fastq"), data_path("COS-5.fasta")) - d.tParms.dir_chk = self.dir_chk - d.tParms.chk_freq=1 - d.tParms.test_mode = False - shutil.copy(os.path.join(TEST_DIR_PATH, "chk_0_0_38_863.npz"),self.dir_chk) - return_dict = dict() - d.tParms.dir_cov_mm = None - logger=ReadMappingInfoLogger() - chk_handler = RCCheckpoint_handler(d.tParms.chk_freq, d.tParms.dir_chk, d.tParms.test_mode) - idx_seq=chk_handler.getIdxSeq(0) - self.assertEqual(idx_seq,0) - readsCoverage(d.inDRawArgs, d.refseq_list[0], d.inDArgs, d.fParms, return_dict, 0,d.line_start, d.line_end, \ - d.tParms, chk_handler,0, logger) - wr=logger.rw_lst[0] - self.assertEqual(wr.read_match,863) - self.assertEqual(wr.count_line,38) - self.assertEqual(wr.interm_res.host_len,0) - self.assertEqual(wr.interm_res.gen_len,3012) - self.assertEqual(int(wr.interm_res.reads_tested),9) - - def test_restart_from_checkpoint2(self): - d=Data4Test(data_path("Virome.fastq"), data_path("Virome.fasta")) - d.tParms.dir_chk = self.dir_chk - d.tParms.chk_freq = 5 - d.tParms.test_mode = False - shutil.copy(data_path("chk_0_2_10_0.npz"), self.dir_chk) - logger = None - chk_handler = RCCheckpoint_handler(d.tParms.chk_freq, d.tParms.dir_chk, d.tParms.test_mode) - idx_seq = chk_handler.getIdxSeq(0) - self.assertEqual(idx_seq, 2) - start=idx_seq - return_dict = dict() - for seq in d.refseq_list[start:]: - print("going to process sequence: ",idx_seq) - print(seq) - readsCoverage(d.inDRawArgs, seq, d.inDArgs, d.fParms, return_dict, 0, - d.line_start, d.line_end, d.tParms, chk_handler, idx_seq, - logger) - idx_seq+=1 - self.assertEqual(idx_seq,5) - - - -if __name__ == "__main__": - unittest.main()