batch_processor.py 3.49 KB
Newer Older
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
import sys
from operator import itemgetter, attrgetter
from time import sleep
import urllib3
import json

LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'

def process_file(accession, region):
    #try:
    if True:
        urllib3.disable_warnings()
        s3 = boto3.client('s3')
        
        print("region - " + region)
        startTime = datetime.now()

        # go to /tmp (important, that's where local storage / nvme is)
        os.chdir("/tmp")
        os.system(' '.join(["pwd"]))
        
        # check free space
        os.system(' '.join(["df", "-h", "."]))

        # download reads from accession
        os.system('mkdir -p out/')
        os.system('prefetch '+accession)
        os.system('../parallel-fastq-dump --split-files --outdir out/ --threads 4 --sra-id '+accession)

        files = os.listdir(os.getcwd() + "/out/")
        print("after fastq-dump, dir listing", files)
        inputDataFn = accession+".inputdata.txt"
        g = open(inputDataFn,"w")
        for f in files:
            g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
        g.close()

        # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step
        # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26

        # run fastp
        os.system(' '.join(["cat","out/*.fastq","|","../fastp", "--trim_poly_x", "--stdin", "-o", accession + ".fastq"]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
51
52
53
54
            
        if os.stat(accession+".fastq").st_size == 0:
            print("fastp produced empty output")
            exit(1)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
        
        # upload filtered reads to s3
        outputBucket = "serratus-rayan"
        s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
        
        endTime = datetime.now()
        diffTime = endTime - startTime
        logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) 


def main():
    accession = ""
    region = "us-east-1"
   
    if "Accession" in os.environ:
        accession = os.environ.get("Accession")
    if "Region" in os.environ:
        region = os.environ.get("Region")

    if len(accession) == 0:
        exit("This script needs an environment variable Accession set to something")

    logMessage(accession, 'parameters: ' + accession+  "  " + region, LOGTYPE_INFO)

    process_file(accession,region)


def logMessage(fileName, message, logType):
    try:
        logMessageDetails = constructMessageFormat(fileName, message, "", logType)
        
        if logType == "INFO" or logType == "ERROR":
            print(logMessageDetails)
        elif logType == "DEBUG":
            try:
                if os.environ.get('DEBUG') == "LOGTYPE":
                   print(logMessageDetails) 
            except KeyError:
                pass
    except Exception as ex:
        logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
        print(logMessageDetails)


def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
    if additionalErrorDetails != "":
        return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details -  " + additionalErrorDetails
    else:
        return "fileName: " + fileName + " " + logType + ": " + message

if __name__ == '__main__':
   main()