import boto3 from boto3.dynamodb.conditions import Key, Attr import csv, sys, time, argparse from datetime import datetime import json import os from time import sleep import urllib3 import glob LOGTYPE_ERROR = 'ERROR' LOGTYPE_INFO = 'INFO' LOGTYPE_DEBUG = 'DEBUG' def fastp(accession,folder,sdb,extra_args=""): # Separate fastq files into paired- and single-end reads #based on https://github.com/hawkjo/sequencetools/blob/master/clean_fastqs_in_subdirs.py fastq_files = glob.glob(folder+"/"+accession+"*.fastq") pe_fastq_files = [] se_fastq_files = [] for fname in fastq_files: if fname[-8:] == '_1.fastq': # If first of pair is found, add both fname2 = fname[:-8] + '_2.fastq' if fname2 not in fastq_files: print('Unpaired _1 file: ' + fname) sdb_log(sdb,accession,'read_unpaired1','True') se_fastq_files.append(fname) else: pe_fastq_files.append((fname, fname2)) elif fname[-8:] == '_2.fastq': # If second of pair is found, test for presence of first, but do nothing fname1 = fname[:-8] + '_1.fastq' if fname1 not in fastq_files: print('Unpaired _2 file: ' + fname) sdb_log(sdb,accession,'read_unpaired2','True') se_fastq_files.append(fname) else: se_fastq_files.append(fname) os.system('rm -f ' + accession + '.fastq') # should be unnecessary for f1, f2 in pe_fastq_files: os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f1, "-I", f2, "--stdout", extra_args, ">>", accession + ".fastq"])) for f in se_fastq_files: os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f, "--stdout", extra_args, ">>", accession + ".fastq"])) def process_file(accession, region): #try: if True: urllib3.disable_warnings() s3 = boto3.client('s3') sdb = boto3.client('sdb', region_name=region) print("region - " + region, flush=True) startTime = datetime.now() os.system(' '.join(["df", "-T"])) os.system(' '.join(["lsblk"])) os.system(' '.join(["echo","contents of /root/.ncbi/user-settings.mkfg"])) os.system(' '.join(["cat","/root/.ncbi/user-settings.mkfg"])) os.system(' '.join(["echo","EOF"])) # go to /tmp (that's where local storage / nvme is) # on second thought.. # go to EBS instead os.chdir("/mnt/serratus-data") os.system(' '.join(["pwd"])) # check free space os.system(' '.join(["df", "-h","."])) # some debug print("at start, dir listing of /mnt/serratus-data", flush=True) os.system(' '.join(['ls','-alR','.'])) # download reads from accession os.system('mkdir -p out/') prefetch_start = datetime.now() # 'is larger than maximum allowed: skipped' solved by: # https://wiki.rc.hms.harvard.edu/display/O2/Aspera+to+download+NCBI+SRA+data #os.system('prefetch --max-size 35G '+accession) os.system('prefetch --max-size 35G '+accession) prefetch_time = datetime.now() - prefetch_start sdb_log(sdb,accession,'prefetch_time',int(prefetch_time.seconds)) os.system('mkdir -p tmp/') pfqdump_start = datetime.now() os.system('/parallel-fastq-dump --skip-technical --split-e --outdir out/ --tmpdir tmp/ --threads 4 --sra-id '+accession) pfqdump_time = datetime.now() - pfqdump_start sdb_log(sdb,accession,'pfqdump_time',int(pfqdump_time.seconds)) files = os.listdir(os.getcwd() + "/out/") print("after fastq-dump, dir listing of out/", files) inputDataFn = accession+".inputdata.txt" g = open(inputDataFn,"w") for f in files: print("file: " + f + " size: " + str(os.stat("out/"+f).st_size), flush=True) g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n") g.close() # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26 # run fastp fastp_start = datetime.now() fastp(accession,"out/", sdb) if os.stat(accession+".fastq").st_size == 0: # fastp output is empty, most likely those reads have dummy quality values. retry. print("retrying fastp without quality filtering", flush=True) sdb_log(sdb,accession,'fastp_noqual','True') fastp(accession,"out/",sdb,"--disable_quality_filtering") if os.stat(accession+".fastq").st_size == 0: # fastp output is STILL empty. could be that read1 or read2 are too short. consider both unpaired print("retrying fastp unpaired mode", flush=True) os.system('mv out/' + accession + '_1.fastq out/' + accession + '_a.fastq') os.system('mv out/' + accession + '_2.fastq out/' + accession + '_b.fastq') sdb_log(sdb,accession,'fastp_unpaired','True') fastp(accession,"out/",sdb,"--disable_quality_filtering") if os.stat(accession+".fastq").st_size == 0: print("fastp produced empty output even without quality filtering", flush=True) sdb_log(sdb,accession,'fastp_empty','True') exit(1) print("fastp done, now uploading to S3") fastp_time = datetime.now() - fastp_start sdb_log(sdb,accession,'fastp_time',int(fastp_time.seconds)) # upload filtered reads to s3 outputBucket = "serratus-rayan" upload_start = datetime.now() s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq") upload_time = datetime.now() - upload_start sdb_log(sdb,accession,'upload_time',int(upload_time.seconds)) # cleanup. #(important when using a local drive) os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"])) os.system(' '.join(["rm","-f",accession+".fastq"])) os.system(' '.join(["rm","-f","public/sra/"+accession+".sra"])) endTime = datetime.now() diffTime = endTime - startTime sdb_log(sdb,accession,'batch_dl_time',int(diffTime.seconds)) sdb_log(sdb,accession,'batch_dl_date',str(datetime.now())) logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) def main(): accession = "" region = "us-east-1" if "Accession" in os.environ: accession = os.environ.get("Accession") if "Region" in os.environ: region = os.environ.get("Region") if len(accession) == 0: exit("This script needs an environment variable Accession set to something") logMessage(accession, 'parameters: ' + accession+ " " + region, LOGTYPE_INFO) process_file(accession,region) def logMessage(fileName, message, logType): try: logMessageDetails = constructMessageFormat(fileName, message, "", logType) if logType == "INFO" or logType == "ERROR": print(logMessageDetails, flush=True) elif logType == "DEBUG": try: if os.environ.get('DEBUG') == "LOGTYPE": print(logMessageDetails, flush=True) except KeyError: pass except Exception as ex: logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType) print(logMessageDetails) def constructMessageFormat(fileName, message, additionalErrorDetails, logType): if additionalErrorDetails != "": return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details - " + additionalErrorDetails else: return "fileName: " + fileName + " " + logType + ": " + message def sdb_log( sdb, item_name, name, value, region='us-east-1', domain_name='serratus-batch', ): """ Insert a single record to simpledb domain. PARAMS: @item_name: unique string for this record. @attributes = [ {'Name': 'duration', 'Value': str(duration), 'Replace': True}, {'Name': 'date', 'Value': str(date), 'Replace': True}, ] """ try: status = sdb.put_attributes( DomainName=domain_name, ItemName=str(item_name), Attributes=[{'Name':str(name), 'Value':str(value), 'Replace': True}] ) except Exception as e: print("SDB put_attribute error:",str(e),'domain_name',domain_name,'item_name',item_name) status = False try: if status['ResponseMetadata']['HTTPStatusCode'] == 200: return True else: print("SDB log error:",status['ResponseMetadata']['HTTPStatusCode']) return False except: print("SDB status structure error, status:",str(status)) return False if __name__ == '__main__': main()