import boto3 from boto3.dynamodb.conditions import Key, Attr import csv, sys, time, argparse from datetime import datetime import json import os import sys from operator import itemgetter, attrgetter from time import sleep import urllib3 import json LOGTYPE_ERROR = 'ERROR' LOGTYPE_INFO = 'INFO' LOGTYPE_DEBUG = 'DEBUG' def process_file(accession, region): #try: if True: urllib3.disable_warnings() s3 = boto3.client('s3') print("region - " + region) startTime = datetime.now() os.system(' '.join(["df", "-T"])) os.system(' '.join(["lsblk"])) os.system(' '.join(["echo","contents of /root/.ncbi/user-settings.mkfg"])) os.system(' '.join(["cat","/root/.ncbi/user-settings.mkfg"])) os.system(' '.join(["echo","EOF"])) # go to /tmp (that's where local storage / nvme is) # on second thought.. # go to EBS instead os.chdir("/mnt/serratus-data") os.system(' '.join(["pwd"])) # check free space os.system(' '.join(["df", "-h","."])) # some debug public_sra_files = os.listdir(os.getcwd() + "/public/sra/") print("at start, dir listing of public/sra/", public_sra_files) # download reads from accession os.system('mkdir -p out/') os.system('prefetch '+accession) os.system('mkdir -p tmp/') os.system('/parallel-fastq-dump --split-files --outdir out/ --tmpdir tmp/ --threads 4 --sra-id '+accession) files = os.listdir(os.getcwd() + "/out/") print("after fastq-dump, dir listing of out/", files) inputDataFn = accession+".inputdata.txt" g = open(inputDataFn,"w") for f in files: g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n") g.close() # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26 # run fastp os.system(' '.join(["cat","out/*.fastq","|","/fastp", "--trim_poly_x", "--stdin", "-o", accession + ".fastq"])) if os.stat(accession+".fastq").st_size == 0: print("fastp produced empty output") exit(1) print("fastp done, now uploading to S3") # upload filtered reads to s3 outputBucket = "serratus-rayan" s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq") # cleanup. #(important when using a local drive) os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"])) os.system(' '.join(["rm","-f",accession+".fastq"])) os.system(' '.join(["rm","-f","public/sra/"+accession+".sra"])) endTime = datetime.now() diffTime = endTime - startTime logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) def main(): accession = "" region = "us-east-1" if "Accession" in os.environ: accession = os.environ.get("Accession") if "Region" in os.environ: region = os.environ.get("Region") if len(accession) == 0: exit("This script needs an environment variable Accession set to something") logMessage(accession, 'parameters: ' + accession+ " " + region, LOGTYPE_INFO) process_file(accession,region) def logMessage(fileName, message, logType): try: logMessageDetails = constructMessageFormat(fileName, message, "", logType) if logType == "INFO" or logType == "ERROR": print(logMessageDetails) elif logType == "DEBUG": try: if os.environ.get('DEBUG') == "LOGTYPE": print(logMessageDetails) except KeyError: pass except Exception as ex: logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType) print(logMessageDetails) def constructMessageFormat(fileName, message, additionalErrorDetails, logType): if additionalErrorDetails != "": return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details - " + additionalErrorDetails else: return "fileName: " + fileName + " " + logType + ": " + message if __name__ == '__main__': main()