diff --git a/README.md b/README.md index 4230fc81c15248ed82b4065cde87f0b45238442c..a77e16d501c3b57eb4c0dd4adc29d650598a5fa3 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,29 @@ Here are the requirements and the used versions. User should edit these commands to fit the current RVDB version: ```sh -wget https://hive.biochemistry.gwu.edu/prd/rvdb/content/U-RVDBv15.1.fasta +RVDB_VERSION=19.0 +mkdir -p ${RVDB_VERSION} && cd ${RVDB_VERSION} + +# a place where temporary files will be written +RVDB_SCRATCH_DIR=$myscratchdir/rvdb${RVDB_VERSION} +# taxadb can be generated or downloaded (please refer to taxadb website) +taxadb=/my/databases/taxadb_full.sqlite +# the directory containing this readme file and the rvdb-prot scripts +rvdbPipeline=$HOME/my/bioinformatics/rvdb-prot/ + + +RVDB_ORIG_DIR=${PWD} +wget https://rvdb.dbi.udel.edu/download/U-RVDBv${RVDB_VERSION}.fasta.gz +mkdir -p ${RVDB_SCRATCH_DIR} + +# here is an example running snakemake with a slurm cluster manager, adapt it to your needs + +slurmpartition="common" +slurmqos="normal" + +# this variable is needed for snakemake to launch subsequent jobs export sbcmd="sbatch --cpus-per-task={threads} --mem={cluster.mem} --partition={cluster.partition} --qos={cluster.qos} --out={cluster.out} --error={cluster.error} --job-name={cluster.name} {cluster.extra}" -snakemake --rerun-incomplete --snakefile ~/dev/rvdb/pipeline/Snakefile -p -pr --local-cores 1 --jobs 1000 --cluster-config slurm.json --cluster \"$sbcmd\" --config origNucl=U-RVDBv15.1.fasta + + +sbatch --qos ${slurmqos} --partition ${slurmpartition} -c 1 --job-name=rvdb${RVDB_VERSION} --wrap="snakemake --rerun-incomplete --snakefile $rvdbPipeline/pipeline/Snakefile -p -pr --local-cores 1 --jobs 1000 --cluster-config $rvdbPipeline/slurm.json --cluster \"$sbcmd\" --config origNucl=${RVDB_ORIG_DIR}/U-RVDBv${RVDB_VERSION}.fasta.gz scratchDir=${RVDB_SCRATCH_DIR} sampleName=rvdb${RVDB_VERSION} taxadb=$taxadb prefix=U-RVDBv${RVDB_VERSION}" ``` diff --git a/annotate/annotations_schema.sql b/annotate/annotations_schema.sql index ae46593440ecffc49048570d5799cf9a69d2a3c6..16a3fc868cfd22316533b599e49e32cf2dddf331 100644 --- a/annotate/annotations_schema.sql +++ b/annotate/annotations_schema.sql @@ -10,7 +10,7 @@ CREATE TABLE keyword( str CHAR(30) UNIQUE ); -CREATE TABLE fam_kw_unsorted( +CREATE TABLE fam_kw_seqnames_unsorted( famId INTEGER, kwId INTEGER, freq INTEGER, @@ -19,7 +19,25 @@ CREATE TABLE fam_kw_unsorted( PRIMARY KEY(famId,kwId) ); -CREATE TABLE fam_kw( +CREATE TABLE fam_kw_ref_unsorted( + famId INTEGER, + kwId INTEGER, + freq INTEGER, + FOREIGN KEY(famId) REFERENCES family(id), + FOREIGN KEY(kwId) REFERENCES keyword(id), + PRIMARY KEY(famId,kwId) +); + +CREATE TABLE fam_kw_seqnames( + famId INTEGER, + kwId INTEGER, + freq INTEGER, + FOREIGN KEY(famId) REFERENCES family(id), + FOREIGN KEY(kwId) REFERENCES keyword(id), + PRIMARY KEY(famId,kwId) +); + +CREATE TABLE fam_kw_ref( famId INTEGER, kwId INTEGER, freq INTEGER, diff --git a/annotate/makeAnnotationsIntoSQL.py b/annotate/makeAnnotationsIntoSQL.py index bb45d6dc3de939883c9d960fa2a3967ea6be9384..a240f3b702d4efd8ee7d452a1ce3ddc7b1941eb1 100644 --- a/annotate/makeAnnotationsIntoSQL.py +++ b/annotate/makeAnnotationsIntoSQL.py @@ -18,32 +18,37 @@ def getClusterInfo(HMMpath): if line.startswith("//"): clusterInfo = {"id": currID, "length": currLength} if line.split() == ("HMM"): - print("plop") break return clusterInfo def extractSeqNames(MSApath): currMSA = open(MSApath) - annotations = [] + seqNameAnnotations = [] for line in currMSA: if line.startswith(">"): - annotations.append(line) + seqNameAnnotations.append(line) currMSA.close() - return annotations + return seqNameAnnotations -def wordCount(annotList): +def extractHMMannots(tbloutPath): + lines = (" ".join(l.split()[18:]) for l in open(tbloutPath) if not l.startswith("#")) + return list(lines) + + +def wordCount(annotList,directName): wc = defaultdict(int) for currAnnot in annotList: - words = re.findall(r"\w{3,}",currAnnot.split("|")[5]) + field = currAnnot if directName else currAnnot.split("|")[5] + words = re.findall(r"\w{3,}",field) for word in words: - wc[word] += 1 + wc[word.lower()] += 1 return wc -def getTaxInfo(tdb_accession,tdb_taxid,annotations): - taxInfo = {"nbSeq":len(annotations)} +def getTaxInfo(tdb_accession,tdb_taxid,seqNameAnnotations): + taxInfo = {"nbSeq":len(seqNameAnnotations)} accs = [] - for ca in annotations: + for ca in seqNameAnnotations: #accs.append(re.findall("[A-Z0-9\._]{5,10}",ca)[0].split(".")[0]) accs.append(ca.split("|")[2].split(".")[0]) taxInfo["taxid"] = [] @@ -82,7 +87,7 @@ def getTaxInfo(tdb_accession,tdb_taxid,annotations): return taxInfo -def writeAnnotations(dbConn,famID,clusterInfo,taxInfo,annotations): +def writeAnnotations(dbConn,famID,clusterInfo,taxInfo,seqNameAnnotations,refHMMannotations): c = dbConn.cursor() c.execute("INSERT INTO family VALUES ({id},{size},{nbSeq},{LCAtaxid});".format( id=famID, @@ -92,18 +97,19 @@ def writeAnnotations(dbConn,famID,clusterInfo,taxInfo,annotations): )) c.executemany("INSERT INTO fam_tax VALUES ({},?,?);".format(famID),taxInfo["taxon"].items()) - wc = wordCount(annotations) - for currWord,currCount in wc.items(): - c.execute("SELECT id FROM keyword WHERE str = ? ;",(currWord,)) - data=c.fetchone() - if data is None: - c.execute("INSERT INTO keyword (str) VALUES (?) ;",(currWord,)) - cwid = c.lastrowid - else: - cwid = data[0] - c.execute("INSERT INTO fam_kw_unsorted VALUES (?,?,?);",(famID,cwid,currCount)) + for (annotations,table,directName) in ((seqNameAnnotations,"fam_kw_seqnames_unsorted",False),(refHMMannotations,"fam_kw_ref_unsorted",True)): + wc = wordCount(annotations,directName) + for currWord,currCount in wc.items(): + c.execute("SELECT id FROM keyword WHERE str = ? ;",(currWord,)) + data=c.fetchone() + if data is None: + c.execute("INSERT INTO keyword (str) VALUES (?) ;",(currWord,)) + cwid = c.lastrowid + else: + cwid = data[0] + c.execute(f"INSERT INTO {table} VALUES (?,?,?);",(famID,cwid,currCount)) - dbConn.commit() + dbConn.commit() @@ -111,6 +117,7 @@ def writeAnnotations(dbConn,famID,clusterInfo,taxInfo,annotations): parser = argparse.ArgumentParser() parser.add_argument("hmmDir", help="hmm dir") parser.add_argument("alignedDir", help="directory containing .msa.faa") +parser.add_argument("annotResultsDir", help="directory containing tblout resulting a reference DB query to annotate") parser.add_argument("taxadb", help="taxadb file") parser.add_argument("schema", help="sql schema to format the db") parser.add_argument("annotDB", help="db to create in which annotation will be stored") @@ -128,18 +135,24 @@ tdb_taxid = TaxID(dbtype='sqlite', dbname=args.taxadb) clusterList = glob.glob(args.hmmDir+"/*.hmm") for currCluster in clusterList: familyName = os.path.basename(currCluster)[:-4] - clusterInfo = getClusterInfo("%s/%s.hmm"%(args.hmmDir,familyName)) + clusterInfo = getClusterInfo(f"{args.hmmDir}/{familyName}.hmm") if not clusterInfo: - print("Failed to read {}".format(familyName)) + print(f"Failed to read {familyName}") continue - annotations = extractSeqNames("%s/%s.fasta"%(args.alignedDir,familyName)) - taxInfo = getTaxInfo(tdb_accession,tdb_taxid,annotations) - writeAnnotations(conn,clusterInfo["id"],clusterInfo,taxInfo,annotations) + seqNameAnnotations = extractSeqNames(f"{args.alignedDir}/{familyName}.fasta") + refHMMannotations = extractHMMannots(f"{args.annotResultsDir}/{familyName}.tblout") + taxInfo = getTaxInfo(tdb_accession,tdb_taxid,seqNameAnnotations) + writeAnnotations(conn,clusterInfo["id"],clusterInfo,taxInfo,seqNameAnnotations,refHMMannotations) # now we need to sort the table fam_kw_unsorted c = conn.cursor() -c.execute("INSERT INTO fam_kw SELECT * FROM fam_kw_unsorted ORDER BY freq DESC;") -c.execute("DROP TABLE fam_kw_unsorted;") +c.execute("INSERT INTO fam_kw_seqnames SELECT * FROM fam_kw_seqnames_unsorted ORDER BY freq DESC;") +c.execute("DROP TABLE fam_kw_seqnames_unsorted;") +conn.commit() + +c = conn.cursor() +c.execute("INSERT INTO fam_kw_ref SELECT * FROM fam_kw_ref_unsorted ORDER BY freq DESC;") +c.execute("DROP TABLE fam_kw_ref_unsorted;") conn.commit() conn.close() diff --git a/annotate/sqlAnnotToTxt.py b/annotate/sqlAnnotToTxt.py index 59d4e5ad7d477ba68305bea453b142b884babf5d..f7cb72af3c97dc15afa728fadc4d2b78c44efced 100644 --- a/annotate/sqlAnnotToTxt.py +++ b/annotate/sqlAnnotToTxt.py @@ -28,7 +28,16 @@ for currFamR in ccf.execute('SELECT * FROM family'): o = open("{dir}/{prefix}{id}.txt".format(dir=args.txtDir,prefix=args.prefix,id=currF),"w") lineage = tdb_taxid.lineage_name(int(currLCA), reverse=True) o.write("LENGTH\t{length}\nLCA\t{lca}\nNBSEQ\t{nbseq}\nKEYWORDS:\n".format(length=currSize,lca=("::".join(tdb_taxid.lineage_name(int(currLCA), reverse=True)) if lineage else ""),nbseq=currNbseq)) - for currFreq,currKw in ckw.execute('select freq,str from fam_kw JOIN keyword ON fam_kw.kwId = keyword.id WHERE fam_kw.famID=? order by freq desc',(currF,)): + + # keywords from reference database + kw_string = [] + for currFreq,currKw in ckw.execute('select freq,str from fam_kw_ref JOIN keyword ON fam_kw_ref.kwId = keyword.id WHERE fam_kw_ref.famID=? order by freq desc',(currF,)): + kw_string.append((f"{currKw}\t{currFreq}")) + o.write( "\n".join(kw_string) if kw_string else "--" ) + + # keywords from sequence names + o.write("\nKEYWORDS FROM SEQUENCES:\n") + for currFreq,currKw in ckw.execute('select freq,str from fam_kw_seqnames JOIN keyword ON fam_kw_seqnames.kwId = keyword.id WHERE fam_kw_seqnames.famID=? order by freq desc',(currF,)): o.write("{kw}\t{count}\n".format(kw=currKw.strip("[]()"),count=currFreq)) o.write("SEQUENCES:\n") with open("aligned/FAM{famName}.fasta".format(famName = str(currF).zfill(6))) as f: diff --git a/pipeline/Snakefile b/pipeline/Snakefile index 6fac37038a4af4aee96fab72c8aac12f8476cb0f..94c6c67eb85f593a4dcd5f20fcb53f02523946e9 100644 --- a/pipeline/Snakefile +++ b/pipeline/Snakefile @@ -1,11 +1,12 @@ -localrules: ConvProt_splitInNames, ConvProt_mergeFasta, Profiles_splitProtFasta, Profiles_makeBlastDB, Profiles_blastToClusters, Profiles_clustersToFamilies, Pofiles_mergeBlast, Profiles_genAnnot, Profiles_mergeHmm, origNucl,Profiles_genTxtAnnot +localrules: ConvProt_splitInNames, ConvProt_mergeFasta, Profiles_splitProtFasta, Profiles_makeBlastDB, Profiles_blastToClusters, Profiles_clustersToFamilies, Pofiles_mergeBlast, Profiles_genAnnot, Profiles_mergeHmm, origNucl,Profiles_genTxtAnnot, bz2Results, zipResults -workdir: "/pasteur/scratch/users/tbigot/rvdb" -jobname="rvdb" +origDir = os.getcwd() +prefix = config["prefix"] -shell("mkdir -p logs/cluster") +workdir: config["scratchDir"] +sampleName = config["sampleName"] -sampleName = "rvdb" +shell("mkdir -p logs/cluster") def annot_input(wildcards): txtAnnot_dir = checkpoints.Profiles_genTxtAnnot.get(**wildcards).output[0] @@ -15,21 +16,38 @@ def annot_input(wildcards): rule all: input: - "prot.hmm", - "prot-hmm.sqlite", + f"{origDir}/{prefix}-prot.fasta.bz2", + f"{origDir}/{prefix}-prot.hmm.bz2", + f"{origDir}/{prefix}-prot-hmm.sqlite.bz2", + f"{origDir}/{prefix}-prot-hmm-txt.zip", + +rule bz2Results: + output: + f"{origDir}/{prefix}-{{f}}.bz2" + input: + "{f}" + shell: + "bzip2 -z -c {input} > {output}" + +rule zipResults: + output: + f"{origDir}/{prefix}-prot-hmm-txt.zip", + input: annot_input + shell: + "zip -r {output} annot" checkpoint Profiles_genTxtAnnot: input: sqlite = "prot-hmm.sqlite", - taxadb = "/local/databases/rel/taxadb/current/db/taxadb_full.sqlite", + taxadb = config["taxadb"], params: txtDir = "annot/", output: directory("annot") shell: "mkdir {output}; " - "module load Python/3.6.0 ; python3 {workflow.basedir}/../annotate/sqlAnnotToTxt.py {input.sqlite} {params.txtDir} {input.taxadb}" + "python3 {workflow.basedir}/../annotate/sqlAnnotToTxt.py {input.sqlite} {params.txtDir} {input.taxadb}" checkpoint Profiles_clustersToFamilies: input: @@ -48,27 +66,36 @@ def input_all_Hmm(wildcards): fam_dir = checkpoints.Profiles_clustersToFamilies.get(**wildcards).output[0] return expand("hmm/{famId}.hmm",famId = glob_wildcards(os.path.join(fam_dir, "{i}.fasta")).i) +def input_all_AgainstRef(wildcards): + fam_dir = checkpoints.Profiles_clustersToFamilies.get(**wildcards).output[0] + return expand("against_ref/{famId}.tblout",famId = glob_wildcards(os.path.join(fam_dir, "{i}.fasta")).i) + rule Profiles_genAnnot: - input: input_all_Hmm + input: + allHmm = input_all_Hmm, + allAgainstRef = input_all_AgainstRef output: "prot-hmm.sqlite" params: hmmDir = "hmm", alignedDir = "aligned", - taxaDB = "/local/databases/rel/taxadb/current/db/taxadb_full.sqlite", - schema = "{}/../annotate/annotations_schema.sql".format(workflow.basedir) + annotResults = "against_ref", + taxaDB = config["taxadb"], + schema = f"{workflow.basedir}/../annotate/annotations_schema.sql" shell: - "module load Python/3.6.0 ; python3 {workflow.basedir}/../annotate/makeAnnotationsIntoSQL.py {params.hmmDir} {params.alignedDir} {params.taxaDB} {params.schema} {output}" + "python3 {workflow.basedir}/../annotate/makeAnnotationsIntoSQL.py {params.hmmDir} {params.alignedDir} {params.annotResults} {params.taxaDB} {params.schema} {output}" rule Profiles_mergeHmm: input: input_all_Hmm output: "prot.hmm" - run: - for hmm in input: - shell("cat {hmm} >> {output}") + shell: + """ + find hmm -maxdepth 1 -type f -name '*.hmm' -print0 | + xargs -0 cat > {output} + """ rule Profiles_hmmbuild: input: @@ -77,7 +104,20 @@ rule Profiles_hmmbuild: hmm="hmm/{part}.hmm", log="hmm/{part}.log" run: - shell("module load hmmer/3.2.1 ; hmmbuild --informat afa -n {wildcards.part} -o {output.log} {output.hmm} {input}") + shell("hmmbuild --cpu {threads} --informat afa -n {wildcards.part} -o {output.log} {output.hmm} {input}") + +rule Profiles_family_against_ref: + input: + "families/{part}.fasta" + output: + tblout="against_ref/{part}.tblout", + log="against_ref/{part}.log" + params: + pfam="/local/databases/rel/pfam/current/hmmer/3.1/Pfam-A.hmm", + options="--cut_ga" + shell: + "hmmscan {params.options} --tblout {output.tblout} -o {output.log} {params.pfam} {input}" + rule Profiles_align: input: @@ -85,9 +125,9 @@ rule Profiles_align: output: "aligned/{part}.fasta" threads: - 1 + 12 shell: - "module load fasta mafft && mafft --thread {threads} --anysymbol --auto {input} > {output}" + "mafft --thread {threads} --anysymbol --auto {input} > {output}" rule Profiles_blastToClusters: @@ -99,19 +139,27 @@ rule Profiles_blastToClusters: params: prefix="FAM" shell: - "module load silix && silix {input.fasta} {input.blastall} -f {params.prefix} > {output}" + "silix {input.fasta} {input.blastall} -f {params.prefix} > {output}" -rule Profiles_splitProtFasta: +checkpoint Profiles_splitProtFasta: input: "prot_collapsed.fasta" output: - dynamic("prot.split/{part}.fasta") + directory("prot.split") params: chunkSize=1500, suffix=".fasta", shell: - "python3 {workflow.basedir}/../build/splitFasta.py {input} prot.split/ --suffix {params.suffix} --maxChunkSize {params.chunkSize}" - + """ + mkdir {output} + python3 {workflow.basedir}/../build/splitFasta.py {input} prot.split/ --suffix {params.suffix} --maxChunkSize {params.chunkSize} + """ + +def get_output_Profiles_splitProtFasta(w): + odir = checkpoints.Profiles_splitProtFasta.get(**w).output[0] + return glob_wildcards(os.path.join(odir, "{i}.fasta")).i + + rule Profiles_collapse: input: "{part}.fasta" @@ -120,15 +168,15 @@ rule Profiles_collapse: params: fractionCoverage = "", fractionId = 1, - memory = 4500 + memory = 10000 threads: 12 shell: - "module load blast/2.2.26 cd-hit/4.6.6 && cd-hit -i {input} -o {output} {params.fractionCoverage} -s {params.fractionId} -T {threads} -M {params.memory}" + "cd-hit -i {input} -o {output} {params.fractionCoverage} -s {params.fractionId} -T {threads} -M {params.memory}" rule Pofiles_mergeBlast: input: - dynamic("blast.split/{part}.blastout") + lambda w: expand("blast.split/{id}.blastout",id=get_output_Profiles_splitProtFasta(w)) output: "all.blastout" shell: @@ -144,7 +192,7 @@ rule Profiles_blastAll: threads: 12 shell: - "module load blast+/2.6.0 && blastp -query {input.fasta} -db {input.bank} -out {output} -outfmt 6 -num_threads {threads}" + "blastp -query {input.fasta} -db {input.bank} -out {output} -outfmt 6 -num_threads {threads}" @@ -154,7 +202,7 @@ rule Profiles_makeBlastDB: output: "{part}.fasta.pin" shell: - "module load blast+/2.6.0 && makeblastdb -in {input} -dbtype prot" + "makeblastdb -in {input} -dbtype prot" rule origNucl: input: @@ -162,13 +210,13 @@ rule origNucl: output: "nucl.fasta" shell: - "cp {input} {output}" + "gunzip -c {input} > {output}" -rule ConvProt_splitInNames: +checkpoint ConvProt_splitInNames: input: "nucl.fasta" output: - dynamic("nucl/part_{dataset}.names") + directory("nucl") params: chunkSize=10000 run: @@ -182,28 +230,34 @@ rule ConvProt_splitInNames: totalSeqs = len(seqNames) print("Read {} sequence names from nucl.fasta.".format(totalSeqs)) nbChunks = round(totalSeqs / params["chunkSize"]) + shell("mkdir {output}") for i in range(nbChunks): last = round((i+1)*(totalSeqs/nbChunks)) first = round(i*(totalSeqs/nbChunks)) - with open("nucl/part_{dataset}.names".format(dataset=i),"w") as o: + with open(f"{output}/{i}.names","w") as o: for l in seqNames[first:(totalSeqs if i == (nbChunks-1) else last)]: o.write(l+"\n") - + +def get_output_ConvProt_splitInNames(w): + odir = checkpoints.ConvProt_splitInNames.get(**w).output[0] + return glob_wildcards(os.path.join(odir, "{i}.names")).i + + rule ConvProt_goldenPart: input: - "nucl/part_{part}.names" + "nucl/{part}.names" output: - fasta="prot/part_{part}.fasta", - incl="prot/part_{part}.fasta_included.txt", - excl="prot/part_{part}.fasta_excluded.txt" + fasta="prot/{part}.fasta", + incl="prot/{part}.fasta_included.txt", + excl="prot/{part}.fasta_excluded.txt" run: - shell("module load golden/3.4.1 && python3 {workflow.basedir}/../build/nucl2ProtGolden.py {input} {output.fasta}") + shell("python3 {workflow.basedir}/../build/nucl2ProtGolden.py {input} {output.fasta}") rule ConvProt_mergeFasta: input: - fasta=dynamic("prot/part_{dataset}.fasta"), - incl=dynamic("prot/part_{dataset}.fasta_included.txt"), - excl=dynamic("prot/part_{dataset}.fasta_excluded.txt") + fasta=lambda w: expand("prot/{i}.fasta",i=get_output_ConvProt_splitInNames(w)), + incl=lambda w: expand("prot/{i}.fasta_included.txt",i=get_output_ConvProt_splitInNames(w)), + excl=lambda w: expand("prot/{i}.fasta_excluded.txt",i=get_output_ConvProt_splitInNames(w)), output: fasta="prot.fasta", incl="included.txt", diff --git a/slurm.json b/slurm.json index b7190bbbc7966069ca5f05dbb0b42487401f498d..cf6e05c990386658d2a8edc6355bc99e8a23d9cc 100644 --- a/slurm.json +++ b/slurm.json @@ -1,12 +1,31 @@ { "__default__" : { - "partition" : "common", - "qos" : "normal", - "mem" : "3G", - "extra" : "", - "name" : "{jobname}.{rule}.{wildcards.part}", - "out" : "logs/cluster/{rule}.{wildcards}.out", - "error" : "logs/cluster/{rule}.{wildcards}.err" + "partition" : "dedicated,common", + "qos" : "fast", + "mem" : "3G", + "extra" : "", + "name" : "{sampleName}.{rule}.{wildcards.part}", + "out" : "logs/cluster/{rule}.{wildcards}.out", + "error" : "logs/cluster/{rule}.{wildcards}.err" + }, + "Profiles_align" : + { + "name" : "{sampleName}.Profiles_align", + "qos" : "normal", + "partition" : "common", + "mem": "60000" + }, + "Profiles_collapse" : + { + "name" : "{sampleName}.Profiles_collapse", + "mem" : "10G", + "qos" : "hubbioit", + "partition" : "hubbioit" + }, + "Profiles_family_against_ref" : + { + "qos" : "normal", + "partition" : "common" }, }