batch_processor.py 4.32 KB
Newer Older
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
import sys
from operator import itemgetter, attrgetter
from time import sleep
import urllib3
import json

LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'

def process_file(accession, region):
    #try:
    if True:
        urllib3.disable_warnings()
        s3 = boto3.client('s3')
        
        print("region - " + region)
        startTime = datetime.now()

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
26
27
        os.system(' '.join(["df", "-T"]))
        os.system(' '.join(["lsblk"]))
28
29
30
31
        os.system(' '.join(["echo","contents of /root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["cat","/root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["echo","EOF"]))
        
32
        # go to /tmp (that's where local storage / nvme is)
33
34
        # on second thought..
        # go to EBS instead
35
        os.chdir("/mnt/serratus-data")
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
36
        os.system(' '.join(["pwd"]))
37

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
38
        # check free space
39
        os.system(' '.join(["df", "-h","."]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
40

41
42
43
44
        # some debug
        public_sra_files = os.listdir(os.getcwd() + "/public/sra/")
        print("at start, dir listing of public/sra/", public_sra_files)

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
45
46
47
        # download reads from accession
        os.system('mkdir -p out/')
        os.system('prefetch '+accession)
48
49
        os.system('mkdir -p tmp/')
        os.system('/parallel-fastq-dump --split-files --outdir out/ --tmpdir tmp/ --threads 4 --sra-id '+accession)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
50
51

        files = os.listdir(os.getcwd() + "/out/")
52
        print("after fastq-dump, dir listing of out/", files)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
53
54
55
56
57
58
59
60
61
62
        inputDataFn = accession+".inputdata.txt"
        g = open(inputDataFn,"w")
        for f in files:
            g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
        g.close()

        # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step
        # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26

        # run fastp
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
63
        os.system(' '.join(["cat","out/*.fastq","|","/fastp", "--trim_poly_x", "--stdin", "-o", accession + ".fastq"]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
64
65
66
67
            
        if os.stat(accession+".fastq").st_size == 0:
            print("fastp produced empty output")
            exit(1)
68
69
70

        print("fastp done, now uploading to S3")

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
71
72
73
        # upload filtered reads to s3
        outputBucket = "serratus-rayan"
        s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
74
       
75
        # cleanup. #(important when using a local drive)
76
77
        os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"]))
        os.system(' '.join(["rm","-f",accession+".fastq"]))
78
        os.system(' '.join(["rm","-f","public/sra/"+accession+".sra"]))
79

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
        endTime = datetime.now()
        diffTime = endTime - startTime
        logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) 


def main():
    accession = ""
    region = "us-east-1"
   
    if "Accession" in os.environ:
        accession = os.environ.get("Accession")
    if "Region" in os.environ:
        region = os.environ.get("Region")

    if len(accession) == 0:
        exit("This script needs an environment variable Accession set to something")

    logMessage(accession, 'parameters: ' + accession+  "  " + region, LOGTYPE_INFO)

    process_file(accession,region)


def logMessage(fileName, message, logType):
    try:
        logMessageDetails = constructMessageFormat(fileName, message, "", logType)
        
        if logType == "INFO" or logType == "ERROR":
            print(logMessageDetails)
        elif logType == "DEBUG":
            try:
                if os.environ.get('DEBUG') == "LOGTYPE":
                   print(logMessageDetails) 
            except KeyError:
                pass
    except Exception as ex:
        logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
        print(logMessageDetails)


def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
    if additionalErrorDetails != "":
        return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details -  " + additionalErrorDetails
    else:
        return "fileName: " + fileName + " " + logType + ": " + message

if __name__ == '__main__':
   main()