batch_processor.py 3.93 KB
Newer Older
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
import sys
from operator import itemgetter, attrgetter
from time import sleep
import urllib3
import json

LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'

def process_file(accession, region):
    #try:
    if True:
        urllib3.disable_warnings()
        s3 = boto3.client('s3')
        
        print("region - " + region)
        startTime = datetime.now()

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
26
27
28
29
30
31
        os.system(' '.join(["df", "-T"]))
        os.system(' '.join(["cat","/dfT.txt"]))
        os.system(' '.join(["lsblk"]))
        os.system(' '.join(["cat","/lsblk.txt"]))


32
33
        # go to /tmp (that's where local storage / nvme is)
        # go to /data instead (EBS)
34
        os.chdir("/mnt/serratus-data")
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
35
36
37
        os.system(' '.join(["pwd"]))
        
        # check free space
38
        os.system(' '.join(["df", "-h","."]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
39
40
41
42

        # download reads from accession
        os.system('mkdir -p out/')
        os.system('prefetch '+accession)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
43
        os.system('/parallel-fastq-dump --split-files --outdir out/ --threads 4 --sra-id '+accession)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
44
45
46
47
48
49
50
51
52
53
54
55
56

        files = os.listdir(os.getcwd() + "/out/")
        print("after fastq-dump, dir listing", files)
        inputDataFn = accession+".inputdata.txt"
        g = open(inputDataFn,"w")
        for f in files:
            g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
        g.close()

        # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step
        # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26

        # run fastp
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
57
        os.system(' '.join(["cat","out/*.fastq","|","/fastp", "--trim_poly_x", "--stdin", "-o", accession + ".fastq"]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
58
59
60
61
            
        if os.stat(accession+".fastq").st_size == 0:
            print("fastp produced empty output")
            exit(1)
62
63
64

        print("fastp done, now uploading to S3")

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
65
66
67
        # upload filtered reads to s3
        outputBucket = "serratus-rayan"
        s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
68
69
70
71
72
       
        # cleanup. important! otherwise files stay on local drive
        os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"]))
        os.system(' '.join(["rm","-f",accession+".fastq"]))

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
        endTime = datetime.now()
        diffTime = endTime - startTime
        logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) 


def main():
    accession = ""
    region = "us-east-1"
   
    if "Accession" in os.environ:
        accession = os.environ.get("Accession")
    if "Region" in os.environ:
        region = os.environ.get("Region")

    if len(accession) == 0:
        exit("This script needs an environment variable Accession set to something")

    logMessage(accession, 'parameters: ' + accession+  "  " + region, LOGTYPE_INFO)

    process_file(accession,region)


def logMessage(fileName, message, logType):
    try:
        logMessageDetails = constructMessageFormat(fileName, message, "", logType)
        
        if logType == "INFO" or logType == "ERROR":
            print(logMessageDetails)
        elif logType == "DEBUG":
            try:
                if os.environ.get('DEBUG') == "LOGTYPE":
                   print(logMessageDetails) 
            except KeyError:
                pass
    except Exception as ex:
        logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
        print(logMessageDetails)


def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
    if additionalErrorDetails != "":
        return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details -  " + additionalErrorDetails
    else:
        return "fileName: " + fileName + " " + logType + ": " + message

if __name__ == '__main__':
   main()