batch_processor.py 3.75 KB
Newer Older
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
import sys
from operator import itemgetter, attrgetter
from time import sleep
import urllib3
import json

LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'

def process_file(accession, region):
    #try:
    if True:
        urllib3.disable_warnings()
        s3 = boto3.client('s3')
        
        print("region - " + region)
        startTime = datetime.now()

26
27
28
        # go to /tmp (that's where local storage / nvme is)
        # go to /data instead (EBS)
        os.chdir("/data")
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
        os.system(' '.join(["pwd"]))
        
        # check free space
        os.system(' '.join(["df", "-h", "."]))

        # download reads from accession
        os.system('mkdir -p out/')
        os.system('prefetch '+accession)
        os.system('../parallel-fastq-dump --split-files --outdir out/ --threads 4 --sra-id '+accession)

        files = os.listdir(os.getcwd() + "/out/")
        print("after fastq-dump, dir listing", files)
        inputDataFn = accession+".inputdata.txt"
        g = open(inputDataFn,"w")
        for f in files:
            g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
        g.close()

        # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step
        # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26

        # run fastp
        os.system(' '.join(["cat","out/*.fastq","|","../fastp", "--trim_poly_x", "--stdin", "-o", accession + ".fastq"]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
52
53
54
55
            
        if os.stat(accession+".fastq").st_size == 0:
            print("fastp produced empty output")
            exit(1)
56
57
58

        print("fastp done, now uploading to S3")

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
59
60
61
        # upload filtered reads to s3
        outputBucket = "serratus-rayan"
        s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
62
63
64
65
66
       
        # cleanup. important! otherwise files stay on local drive
        os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"]))
        os.system(' '.join(["rm","-f",accession+".fastq"]))

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
        endTime = datetime.now()
        diffTime = endTime - startTime
        logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) 


def main():
    accession = ""
    region = "us-east-1"
   
    if "Accession" in os.environ:
        accession = os.environ.get("Accession")
    if "Region" in os.environ:
        region = os.environ.get("Region")

    if len(accession) == 0:
        exit("This script needs an environment variable Accession set to something")

    logMessage(accession, 'parameters: ' + accession+  "  " + region, LOGTYPE_INFO)

    process_file(accession,region)


def logMessage(fileName, message, logType):
    try:
        logMessageDetails = constructMessageFormat(fileName, message, "", logType)
        
        if logType == "INFO" or logType == "ERROR":
            print(logMessageDetails)
        elif logType == "DEBUG":
            try:
                if os.environ.get('DEBUG') == "LOGTYPE":
                   print(logMessageDetails) 
            except KeyError:
                pass
    except Exception as ex:
        logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
        print(logMessageDetails)


def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
    if additionalErrorDetails != "":
        return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details -  " + additionalErrorDetails
    else:
        return "fileName: " + fileName + " " + logType + ": " + message

if __name__ == '__main__':
   main()