batch_processor.py 7.91 KB
Newer Older
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
1
2
3
4
5
6
7
8
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
from time import sleep
import urllib3
9
import glob
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
10
11
12
13
14

LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def fastp(accession,folder,sdb,extra_args=""):
    # Separate fastq files into paired- and single-end reads
    #based on https://github.com/hawkjo/sequencetools/blob/master/clean_fastqs_in_subdirs.py
    fastq_files = glob.glob(folder+"*.fastq")
    pe_fastq_files = []
    se_fastq_files = []
    for fname in fastq_files:
        if fname[-8:] == '_1.fastq':
            # If first of pair is found, add both
            fname2 = fname[:-8] + '_2.fastq'
            assert fname2 in fastq_files, 'Unpaired _1 file: ' + fname
            pe_fastq_files.append((fname, fname2))
        elif fname[-8:] == '_2.fastq':
            # If second of pair is found, test for presence of first, but do nothing
            fname1 = fname[:-8] + '_1.fastq'
            assert fname1 in fastq_files, 'Unpaired _2 file: ' + fname
        else:
            se_fastq_files.append(fname)

    os.system('rm -f ' + accession + '.fastq') # should be unnecessary
    for f1, f2 in pe_fastq_files:
        os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f1, "-I", f2, "--stdout", extra_args, ">>", accession + ".fastq"]))
    for f in se_fastq_files:
        os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f,            "--stdout", extra_args, ">>", accession + ".fastq"]))
 

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
41
42
43
44
45
def process_file(accession, region):
    #try:
    if True:
        urllib3.disable_warnings()
        s3 = boto3.client('s3')
46
        sdb = boto3.client('sdb', region_name=region)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
47
        
48
        print("region - " + region, flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
49
50
        startTime = datetime.now()

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
51
52
        os.system(' '.join(["df", "-T"]))
        os.system(' '.join(["lsblk"]))
53
54
55
56
        os.system(' '.join(["echo","contents of /root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["cat","/root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["echo","EOF"]))
        
57
        # go to /tmp (that's where local storage / nvme is)
58
59
        # on second thought..
        # go to EBS instead
60
        os.chdir("/mnt/serratus-data")
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
61
        os.system(' '.join(["pwd"]))
62

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
63
        # check free space
64
        os.system(' '.join(["df", "-h","."]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
65

66
        # some debug
67
        print("at start, dir listing of /mnt/serratus-data", flush=True)
Rayan  CHIKHI's avatar
tweaks    
Rayan CHIKHI committed
68
        os.system(' '.join(['ls','-alR','.']))
69

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
70
71
        # download reads from accession
        os.system('mkdir -p out/')
72
        prefetch_start = datetime.now()
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
73
        os.system('prefetch '+accession)
74
75
        prefetch_time = datetime.now() - prefetch_start 
        sdb_log(sdb,accession,'prefetch_time',int(prefetch_time.seconds))
76
        os.system('mkdir -p tmp/')
77
        pfqdump_start = datetime.now()
78
        os.system('/parallel-fastq-dump --split-files --outdir out/ --tmpdir tmp/ --threads 4 --sra-id '+accession)
79
80
81
        pfqdump_time = datetime.now() - pfqdump_start
        sdb_log(sdb,accession,'pfqdump_time',int(pfqdump_time.seconds))

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
82
83

        files = os.listdir(os.getcwd() + "/out/")
84
        print("after fastq-dump, dir listing of out/", files)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
85
86
87
        inputDataFn = accession+".inputdata.txt"
        g = open(inputDataFn,"w")
        for f in files:
88
            print("file: " + f + " size: " + str(os.stat("out/"+f).st_size), flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
89
90
91
92
93
94
95
            g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
        g.close()

        # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step
        # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26

        # run fastp
96
97
98
        fastp_start = datetime.now()
        fastp(accession,"out/", sdb)

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
99
        if os.stat(accession+".fastq").st_size == 0:
100
101
102
103
104
105
106
107
            # fastp output is empty, most likely those reads have dummy quality values. retry.
            print("retrying fastp without quality filtering", flush=true)
            sdb_log(sdb,accession,'fastp_noqual','True')
            fastp(accession,"out/",sdb,"--disable_quality_filtering")
           
        if os.stat(accession+".fastq").st_size == 0:
            print("fastp produced empty output even without quality filtering", flush=True)
            sdb_log(sdb,accession,'fastp_empty','True')
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
108
            exit(1)
109
110

        print("fastp done, now uploading to S3")
111
112
        fastp_time = datetime.now() - fastp_start 
        sdb_log(sdb,accession,'fastp_time',int(fastp_time.seconds))
113

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
114
115
        # upload filtered reads to s3
        outputBucket = "serratus-rayan"
116
        upload_start = datetime.now()
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
117
        s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
118
119
        upload_time = datetime.now() - upload_start
        sdb_log(sdb,accession,'upload_time',int(upload_time.seconds))
120
       
121
        # cleanup. #(important when using a local drive)
122
123
        os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"]))
        os.system(' '.join(["rm","-f",accession+".fastq"]))
124
        os.system(' '.join(["rm","-f","public/sra/"+accession+".sra"]))
125

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
126
127
        endTime = datetime.now()
        diffTime = endTime - startTime
128
        sdb_log(sdb,accession,'batch_dl_time',int(diffTime.seconds))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
        logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) 


def main():
    accession = ""
    region = "us-east-1"
   
    if "Accession" in os.environ:
        accession = os.environ.get("Accession")
    if "Region" in os.environ:
        region = os.environ.get("Region")

    if len(accession) == 0:
        exit("This script needs an environment variable Accession set to something")

    logMessage(accession, 'parameters: ' + accession+  "  " + region, LOGTYPE_INFO)

    process_file(accession,region)


def logMessage(fileName, message, logType):
    try:
        logMessageDetails = constructMessageFormat(fileName, message, "", logType)
        
        if logType == "INFO" or logType == "ERROR":
154
            print(logMessageDetails, flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
155
156
157
        elif logType == "DEBUG":
            try:
                if os.environ.get('DEBUG') == "LOGTYPE":
158
                   print(logMessageDetails, flush=True) 
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
159
160
161
162
163
164
165
166
167
168
169
170
171
            except KeyError:
                pass
    except Exception as ex:
        logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
        print(logMessageDetails)


def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
    if additionalErrorDetails != "":
        return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details -  " + additionalErrorDetails
    else:
        return "fileName: " + fileName + " " + logType + ": " + message

172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def sdb_log(
        sdb, item_name, name, value,
        region='us-east-1', domain_name='serratus-batch',
    ):
        """
        Insert a single record to simpledb domain.
        PARAMS:
        @item_name: unique string for this record.
        @attributes = [
            {'Name': 'duration', 'Value': str(duration), 'Replace': True},
            {'Name': 'date', 'Value': str(date), 'Replace': True},
        ]
        """
        try:
            status = sdb.put_attributes(
                DomainName=domain_name,
                ItemName=str(item_name),
                Attributes=[{'Name':str(name), 'Value':str(value), 'Replace': True}]
            )
        except Exception as e:
            print("SDB put_attribute error:",str(e),'domain_name',domain_name,'item_name',item_name)
            status = False
        try:
            if status['ResponseMetadata']['HTTPStatusCode'] == 200:
                return True
            else:
                print("SDB log error:",status['ResponseMetadata']['HTTPStatusCode'])
                return False
        except:
            print("SDB status structure error, status:",str(status))
            return False


Rayan  CHIKHI's avatar
Rayan CHIKHI committed
205
206
if __name__ == '__main__':
   main()