Commit 76f90e2a authored by Rayan  CHIKHI's avatar Rayan CHIKHI
Browse files

significant overhaul of download script, more robust fastp, added sdb

parent d4c615f7
......@@ -4,23 +4,48 @@ import csv, sys, time, argparse
from datetime import datetime
import json
import os
import sys
from operator import itemgetter, attrgetter
from time import sleep
import urllib3
import json
import glob
LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'
def fastp(accession,folder,sdb,extra_args=""):
# Separate fastq files into paired- and single-end reads
#based on https://github.com/hawkjo/sequencetools/blob/master/clean_fastqs_in_subdirs.py
fastq_files = glob.glob(folder+"*.fastq")
pe_fastq_files = []
se_fastq_files = []
for fname in fastq_files:
if fname[-8:] == '_1.fastq':
# If first of pair is found, add both
fname2 = fname[:-8] + '_2.fastq'
assert fname2 in fastq_files, 'Unpaired _1 file: ' + fname
pe_fastq_files.append((fname, fname2))
elif fname[-8:] == '_2.fastq':
# If second of pair is found, test for presence of first, but do nothing
fname1 = fname[:-8] + '_1.fastq'
assert fname1 in fastq_files, 'Unpaired _2 file: ' + fname
else:
se_fastq_files.append(fname)
os.system('rm -f ' + accession + '.fastq') # should be unnecessary
for f1, f2 in pe_fastq_files:
os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f1, "-I", f2, "--stdout", extra_args, ">>", accession + ".fastq"]))
for f in se_fastq_files:
os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f, "--stdout", extra_args, ">>", accession + ".fastq"]))
def process_file(accession, region):
#try:
if True:
urllib3.disable_warnings()
s3 = boto3.client('s3')
sdb = boto3.client('sdb', region_name=region)
print("region - " + region)
print("region - " + region, flush=True)
startTime = datetime.now()
os.system(' '.join(["df", "-T"]))
......@@ -39,21 +64,28 @@ def process_file(accession, region):
os.system(' '.join(["df", "-h","."]))
# some debug
print("at start, dir listing of /mnt/serratus-data")
print("at start, dir listing of /mnt/serratus-data", flush=True)
os.system(' '.join(['ls','-alR','.']))
# download reads from accession
os.system('mkdir -p out/')
prefetch_start = datetime.now()
os.system('prefetch '+accession)
prefetch_time = datetime.now() - prefetch_start
sdb_log(sdb,accession,'prefetch_time',int(prefetch_time.seconds))
os.system('mkdir -p tmp/')
pfqdump_start = datetime.now()
os.system('/parallel-fastq-dump --split-files --outdir out/ --tmpdir tmp/ --threads 4 --sra-id '+accession)
pfqdump_time = datetime.now() - pfqdump_start
sdb_log(sdb,accession,'pfqdump_time',int(pfqdump_time.seconds))
files = os.listdir(os.getcwd() + "/out/")
print("after fastq-dump, dir listing of out/", files)
inputDataFn = accession+".inputdata.txt"
g = open(inputDataFn,"w")
for f in files:
print("file: " + f + " size: " + str(os.stat("out/"+f).st_size))
print("file: " + f + " size: " + str(os.stat("out/"+f).st_size), flush=True)
g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
g.close()
......@@ -61,17 +93,30 @@ def process_file(accession, region):
# as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26
# run fastp
os.system(' '.join(["cat","out/*.fastq","|","/fastp", "--thread", "4", "--trim_poly_x", "--stdin", "-o", accession + ".fastq"]))
fastp_start = datetime.now()
fastp(accession,"out/", sdb)
if os.stat(accession+".fastq").st_size == 0:
print("fastp produced empty output")
# fastp output is empty, most likely those reads have dummy quality values. retry.
print("retrying fastp without quality filtering", flush=true)
sdb_log(sdb,accession,'fastp_noqual','True')
fastp(accession,"out/",sdb,"--disable_quality_filtering")
if os.stat(accession+".fastq").st_size == 0:
print("fastp produced empty output even without quality filtering", flush=True)
sdb_log(sdb,accession,'fastp_empty','True')
exit(1)
print("fastp done, now uploading to S3")
fastp_time = datetime.now() - fastp_start
sdb_log(sdb,accession,'fastp_time',int(fastp_time.seconds))
# upload filtered reads to s3
outputBucket = "serratus-rayan"
upload_start = datetime.now()
s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
upload_time = datetime.now() - upload_start
sdb_log(sdb,accession,'upload_time',int(upload_time.seconds))
# cleanup. #(important when using a local drive)
os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"]))
......@@ -80,6 +125,7 @@ def process_file(accession, region):
endTime = datetime.now()
diffTime = endTime - startTime
sdb_log(sdb,accession,'batch_dl_time',int(diffTime.seconds))
logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO)
......@@ -105,11 +151,11 @@ def logMessage(fileName, message, logType):
logMessageDetails = constructMessageFormat(fileName, message, "", logType)
if logType == "INFO" or logType == "ERROR":
print(logMessageDetails)
print(logMessageDetails, flush=True)
elif logType == "DEBUG":
try:
if os.environ.get('DEBUG') == "LOGTYPE":
print(logMessageDetails)
print(logMessageDetails, flush=True)
except KeyError:
pass
except Exception as ex:
......@@ -123,5 +169,38 @@ def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
else:
return "fileName: " + fileName + " " + logType + ": " + message
def sdb_log(
sdb, item_name, name, value,
region='us-east-1', domain_name='serratus-batch',
):
"""
Insert a single record to simpledb domain.
PARAMS:
@item_name: unique string for this record.
@attributes = [
{'Name': 'duration', 'Value': str(duration), 'Replace': True},
{'Name': 'date', 'Value': str(date), 'Replace': True},
]
"""
try:
status = sdb.put_attributes(
DomainName=domain_name,
ItemName=str(item_name),
Attributes=[{'Name':str(name), 'Value':str(value), 'Replace': True}]
)
except Exception as e:
print("SDB put_attribute error:",str(e),'domain_name',domain_name,'item_name',item_name)
status = False
try:
if status['ResponseMetadata']['HTTPStatusCode'] == 200:
return True
else:
print("SDB log error:",status['ResponseMetadata']['HTTPStatusCode'])
return False
except:
print("SDB status structure error, status:",str(status))
return False
if __name__ == '__main__':
main()
---
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Orchestrating an Application Process with AWS Batch using CloudFormation'
Description: 'Serratus Batch job for downloading reads, trimming, uploading to S3'
Resources:
VPC:
Type: AWS::EC2::VPC
......@@ -79,6 +79,7 @@ Resources:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess # for sdb
SpotIamFleetRole: # taken from https://github.com/aodn/aws-wps/blob/master/wps-cloudformation-template.yaml
Type: AWS::IAM::Role
......
aws cloudformation update-stack --stack-name serratus-batch-assembly --template-body file://template/template.yaml --capabilities CAPABILITY_NAMED_IAM
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment