batch_processor.py 9.02 KB
Newer Older
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
1
2
3
4
5
6
7
8
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
from time import sleep
import urllib3
9
import glob
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
10
11
12
13
14

LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'

15
16
17
def fastp(accession,folder,sdb,extra_args=""):
    # Separate fastq files into paired- and single-end reads
    #based on https://github.com/hawkjo/sequencetools/blob/master/clean_fastqs_in_subdirs.py
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
18
    fastq_files = glob.glob(folder+"/"+accession+"*.fastq")
19
20
21
22
23
24
    pe_fastq_files = []
    se_fastq_files = []
    for fname in fastq_files:
        if fname[-8:] == '_1.fastq':
            # If first of pair is found, add both
            fname2 = fname[:-8] + '_2.fastq'
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
25
26
27
28
29
30
            if fname2 not in fastq_files:
                print('Unpaired _1 file: ' + fname)
                sdb_log(sdb,accession,'read_unpaired1','True')
                se_fastq_files.append(fname)
            else:
                pe_fastq_files.append((fname, fname2))
31
32
33
        elif fname[-8:] == '_2.fastq':
            # If second of pair is found, test for presence of first, but do nothing
            fname1 = fname[:-8] + '_1.fastq'
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
34
35
36
37
            if fname1 not in fastq_files:
                print('Unpaired _2 file: ' + fname)
                sdb_log(sdb,accession,'read_unpaired2','True')
                se_fastq_files.append(fname)
38
39
40
41
42
43
44
45
46
47
        else:
            se_fastq_files.append(fname)

    os.system('rm -f ' + accession + '.fastq') # should be unnecessary
    for f1, f2 in pe_fastq_files:
        os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f1, "-I", f2, "--stdout", extra_args, ">>", accession + ".fastq"]))
    for f in se_fastq_files:
        os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f,            "--stdout", extra_args, ">>", accession + ".fastq"]))
 

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
48
49
50
51
52
def process_file(accession, region):
    #try:
    if True:
        urllib3.disable_warnings()
        s3 = boto3.client('s3')
53
        sdb = boto3.client('sdb', region_name=region)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
54
        
55
        print("region - " + region, flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
56
57
        startTime = datetime.now()

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
58
59
        os.system(' '.join(["df", "-T"]))
        os.system(' '.join(["lsblk"]))
60
61
62
63
        os.system(' '.join(["echo","contents of /root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["cat","/root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["echo","EOF"]))
        
64
        # go to /tmp (that's where local storage / nvme is)
65
66
        # on second thought..
        # go to EBS instead
67
        os.chdir("/mnt/serratus-data")
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
68
        os.system(' '.join(["pwd"]))
69

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
70
        # check free space
71
        os.system(' '.join(["df", "-h","."]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
72

73
        # some debug
74
        print("at start, dir listing of /mnt/serratus-data", flush=True)
Rayan  CHIKHI's avatar
tweaks    
Rayan CHIKHI committed
75
        os.system(' '.join(['ls','-alR','.']))
76

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
77
78
        # download reads from accession
        os.system('mkdir -p out/')
79
        prefetch_start = datetime.now()
80
81
        # 'is larger than maximum allowed: skipped' solved by:
        # https://wiki.rc.hms.harvard.edu/display/O2/Aspera+to+download+NCBI+SRA+data
82
        #os.system('prefetch --max-size 35G '+accession)
Rayan's avatar
Rayan committed
83
        os.system('prefetch --max-size 35G '+accession)
84
85
        prefetch_time = datetime.now() - prefetch_start 
        sdb_log(sdb,accession,'prefetch_time',int(prefetch_time.seconds))
86
        os.system('mkdir -p tmp/')
87
        pfqdump_start = datetime.now()
88
        os.system('/parallel-fastq-dump --skip-technical --split-e --outdir out/ --tmpdir tmp/ --threads 4 --sra-id '+accession)
89
90
91
        pfqdump_time = datetime.now() - pfqdump_start
        sdb_log(sdb,accession,'pfqdump_time',int(pfqdump_time.seconds))

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
92
93

        files = os.listdir(os.getcwd() + "/out/")
94
        print("after fastq-dump, dir listing of out/", files)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
95
96
97
        inputDataFn = accession+".inputdata.txt"
        g = open(inputDataFn,"w")
        for f in files:
98
            print("file: " + f + " size: " + str(os.stat("out/"+f).st_size), flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
99
100
101
102
103
104
105
            g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
        g.close()

        # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step
        # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26

        # run fastp
106
107
108
        fastp_start = datetime.now()
        fastp(accession,"out/", sdb)

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
109
        if os.stat(accession+".fastq").st_size == 0:
110
            # fastp output is empty, most likely those reads have dummy quality values. retry.
111
            print("retrying fastp without quality filtering", flush=True)
112
113
            sdb_log(sdb,accession,'fastp_noqual','True')
            fastp(accession,"out/",sdb,"--disable_quality_filtering")
114
115
116
117
118
119
120
121
 
        if os.stat(accession+".fastq").st_size == 0:
            # fastp output is STILL empty. could be that read1 or read2 are too short. consider both unpaired
            print("retrying fastp unpaired mode", flush=True)
            os.system('mv out/' + accession + '_1.fastq out/' + accession + '_a.fastq')
            os.system('mv out/' + accession + '_2.fastq out/' + accession + '_b.fastq')
            sdb_log(sdb,accession,'fastp_unpaired','True')
            fastp(accession,"out/",sdb,"--disable_quality_filtering")
122
123
124
125
           
        if os.stat(accession+".fastq").st_size == 0:
            print("fastp produced empty output even without quality filtering", flush=True)
            sdb_log(sdb,accession,'fastp_empty','True')
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
126
            exit(1)
127
128

        print("fastp done, now uploading to S3")
129
130
        fastp_time = datetime.now() - fastp_start 
        sdb_log(sdb,accession,'fastp_time',int(fastp_time.seconds))
131

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
132
133
        # upload filtered reads to s3
        outputBucket = "serratus-rayan"
134
        upload_start = datetime.now()
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
135
        s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
136
137
        upload_time = datetime.now() - upload_start
        sdb_log(sdb,accession,'upload_time',int(upload_time.seconds))
138
       
139
        # cleanup. #(important when using a local drive)
140
141
        os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"]))
        os.system(' '.join(["rm","-f",accession+".fastq"]))
142
        os.system(' '.join(["rm","-f","public/sra/"+accession+".sra"]))
143

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
144
145
        endTime = datetime.now()
        diffTime = endTime - startTime
146
        sdb_log(sdb,accession,'batch_dl_time',int(diffTime.seconds))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
147
        sdb_log(sdb,accession,'batch_dl_date',str(datetime.datetime.now()))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
        logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) 


def main():
    accession = ""
    region = "us-east-1"
   
    if "Accession" in os.environ:
        accession = os.environ.get("Accession")
    if "Region" in os.environ:
        region = os.environ.get("Region")

    if len(accession) == 0:
        exit("This script needs an environment variable Accession set to something")

    logMessage(accession, 'parameters: ' + accession+  "  " + region, LOGTYPE_INFO)

    process_file(accession,region)


def logMessage(fileName, message, logType):
    try:
        logMessageDetails = constructMessageFormat(fileName, message, "", logType)
        
        if logType == "INFO" or logType == "ERROR":
173
            print(logMessageDetails, flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
174
175
176
        elif logType == "DEBUG":
            try:
                if os.environ.get('DEBUG') == "LOGTYPE":
177
                   print(logMessageDetails, flush=True) 
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
178
179
180
181
182
183
184
185
186
187
188
189
190
            except KeyError:
                pass
    except Exception as ex:
        logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
        print(logMessageDetails)


def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
    if additionalErrorDetails != "":
        return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details -  " + additionalErrorDetails
    else:
        return "fileName: " + fileName + " " + logType + ": " + message

191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def sdb_log(
        sdb, item_name, name, value,
        region='us-east-1', domain_name='serratus-batch',
    ):
        """
        Insert a single record to simpledb domain.
        PARAMS:
        @item_name: unique string for this record.
        @attributes = [
            {'Name': 'duration', 'Value': str(duration), 'Replace': True},
            {'Name': 'date', 'Value': str(date), 'Replace': True},
        ]
        """
        try:
            status = sdb.put_attributes(
                DomainName=domain_name,
                ItemName=str(item_name),
                Attributes=[{'Name':str(name), 'Value':str(value), 'Replace': True}]
            )
        except Exception as e:
            print("SDB put_attribute error:",str(e),'domain_name',domain_name,'item_name',item_name)
            status = False
        try:
            if status['ResponseMetadata']['HTTPStatusCode'] == 200:
                return True
            else:
                print("SDB log error:",status['ResponseMetadata']['HTTPStatusCode'])
                return False
        except:
            print("SDB status structure error, status:",str(status))
            return False


Rayan  CHIKHI's avatar
Rayan CHIKHI committed
224
225
if __name__ == '__main__':
   main()