batch_processor.py 8.89 KB
Newer Older
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
1
2
3
4
5
6
7
8
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
from time import sleep
import urllib3
9
import glob
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
10
11
12
13
14

LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'

15
16
17
def fastp(accession,folder,sdb,extra_args=""):
    # Separate fastq files into paired- and single-end reads
    #based on https://github.com/hawkjo/sequencetools/blob/master/clean_fastqs_in_subdirs.py
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
18
    fastq_files = glob.glob(folder+"/"+accession+"*.fastq")
19
20
21
22
23
24
    pe_fastq_files = []
    se_fastq_files = []
    for fname in fastq_files:
        if fname[-8:] == '_1.fastq':
            # If first of pair is found, add both
            fname2 = fname[:-8] + '_2.fastq'
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
25
26
27
28
29
30
            if fname2 not in fastq_files:
                print('Unpaired _1 file: ' + fname)
                sdb_log(sdb,accession,'read_unpaired1','True')
                se_fastq_files.append(fname)
            else:
                pe_fastq_files.append((fname, fname2))
31
32
33
        elif fname[-8:] == '_2.fastq':
            # If second of pair is found, test for presence of first, but do nothing
            fname1 = fname[:-8] + '_1.fastq'
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
34
35
36
37
            if fname1 not in fastq_files:
                print('Unpaired _2 file: ' + fname)
                sdb_log(sdb,accession,'read_unpaired2','True')
                se_fastq_files.append(fname)
38
39
40
41
42
43
44
45
46
47
        else:
            se_fastq_files.append(fname)

    os.system('rm -f ' + accession + '.fastq') # should be unnecessary
    for f1, f2 in pe_fastq_files:
        os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f1, "-I", f2, "--stdout", extra_args, ">>", accession + ".fastq"]))
    for f in se_fastq_files:
        os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f,            "--stdout", extra_args, ">>", accession + ".fastq"]))
 

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
48
49
50
51
52
def process_file(accession, region):
    #try:
    if True:
        urllib3.disable_warnings()
        s3 = boto3.client('s3')
53
        sdb = boto3.client('sdb', region_name=region)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
54
        
55
        print("region - " + region, flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
56
57
        startTime = datetime.now()

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
58
59
        os.system(' '.join(["df", "-T"]))
        os.system(' '.join(["lsblk"]))
60
61
62
63
        os.system(' '.join(["echo","contents of /root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["cat","/root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["echo","EOF"]))
        
64
        # go to /tmp (that's where local storage / nvme is)
65
66
        # on second thought..
        # go to EBS instead
67
        os.chdir("/mnt/serratus-data")
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
68
        os.system(' '.join(["pwd"]))
69

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
70
        # check free space
71
        os.system(' '.join(["df", "-h","."]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
72

73
        # some debug
74
        print("at start, dir listing of /mnt/serratus-data", flush=True)
Rayan  CHIKHI's avatar
tweaks    
Rayan CHIKHI committed
75
        os.system(' '.join(['ls','-alR','.']))
76

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
77
78
        # download reads from accession
        os.system('mkdir -p out/')
79
        prefetch_start = datetime.now()
80
81
82
        # 'is larger than maximum allowed: skipped' solved by:
        # https://wiki.rc.hms.harvard.edu/display/O2/Aspera+to+download+NCBI+SRA+data
        os.system('prefetch --max-size 35G '+accession)
83
84
        prefetch_time = datetime.now() - prefetch_start 
        sdb_log(sdb,accession,'prefetch_time',int(prefetch_time.seconds))
85
        os.system('mkdir -p tmp/')
86
        pfqdump_start = datetime.now()
87
        os.system('/parallel-fastq-dump --skip-technical --split-3 --outdir out/ --tmpdir tmp/ --threads 4 --sra-id '+accession)
88
89
90
        pfqdump_time = datetime.now() - pfqdump_start
        sdb_log(sdb,accession,'pfqdump_time',int(pfqdump_time.seconds))

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
91
92

        files = os.listdir(os.getcwd() + "/out/")
93
        print("after fastq-dump, dir listing of out/", files)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
94
95
96
        inputDataFn = accession+".inputdata.txt"
        g = open(inputDataFn,"w")
        for f in files:
97
            print("file: " + f + " size: " + str(os.stat("out/"+f).st_size), flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
98
99
100
101
102
103
104
            g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
        g.close()

        # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step
        # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26

        # run fastp
105
106
107
        fastp_start = datetime.now()
        fastp(accession,"out/", sdb)

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
108
        if os.stat(accession+".fastq").st_size == 0:
109
            # fastp output is empty, most likely those reads have dummy quality values. retry.
110
            print("retrying fastp without quality filtering", flush=True)
111
112
            sdb_log(sdb,accession,'fastp_noqual','True')
            fastp(accession,"out/",sdb,"--disable_quality_filtering")
113
114
115
116
117
118
119
120
 
        if os.stat(accession+".fastq").st_size == 0:
            # fastp output is STILL empty. could be that read1 or read2 are too short. consider both unpaired
            print("retrying fastp unpaired mode", flush=True)
            os.system('mv out/' + accession + '_1.fastq out/' + accession + '_a.fastq')
            os.system('mv out/' + accession + '_2.fastq out/' + accession + '_b.fastq')
            sdb_log(sdb,accession,'fastp_unpaired','True')
            fastp(accession,"out/",sdb,"--disable_quality_filtering")
121
122
123
124
           
        if os.stat(accession+".fastq").st_size == 0:
            print("fastp produced empty output even without quality filtering", flush=True)
            sdb_log(sdb,accession,'fastp_empty','True')
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
125
            exit(1)
126
127

        print("fastp done, now uploading to S3")
128
129
        fastp_time = datetime.now() - fastp_start 
        sdb_log(sdb,accession,'fastp_time',int(fastp_time.seconds))
130

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
131
132
        # upload filtered reads to s3
        outputBucket = "serratus-rayan"
133
        upload_start = datetime.now()
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
134
        s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
135
136
        upload_time = datetime.now() - upload_start
        sdb_log(sdb,accession,'upload_time',int(upload_time.seconds))
137
       
138
        # cleanup. #(important when using a local drive)
139
140
        os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"]))
        os.system(' '.join(["rm","-f",accession+".fastq"]))
141
        os.system(' '.join(["rm","-f","public/sra/"+accession+".sra"]))
142

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
143
144
        endTime = datetime.now()
        diffTime = endTime - startTime
145
        sdb_log(sdb,accession,'batch_dl_time',int(diffTime.seconds))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
        logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) 


def main():
    accession = ""
    region = "us-east-1"
   
    if "Accession" in os.environ:
        accession = os.environ.get("Accession")
    if "Region" in os.environ:
        region = os.environ.get("Region")

    if len(accession) == 0:
        exit("This script needs an environment variable Accession set to something")

    logMessage(accession, 'parameters: ' + accession+  "  " + region, LOGTYPE_INFO)

    process_file(accession,region)


def logMessage(fileName, message, logType):
    try:
        logMessageDetails = constructMessageFormat(fileName, message, "", logType)
        
        if logType == "INFO" or logType == "ERROR":
171
            print(logMessageDetails, flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
172
173
174
        elif logType == "DEBUG":
            try:
                if os.environ.get('DEBUG') == "LOGTYPE":
175
                   print(logMessageDetails, flush=True) 
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
176
177
178
179
180
181
182
183
184
185
186
187
188
            except KeyError:
                pass
    except Exception as ex:
        logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
        print(logMessageDetails)


def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
    if additionalErrorDetails != "":
        return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details -  " + additionalErrorDetails
    else:
        return "fileName: " + fileName + " " + logType + ": " + message

189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def sdb_log(
        sdb, item_name, name, value,
        region='us-east-1', domain_name='serratus-batch',
    ):
        """
        Insert a single record to simpledb domain.
        PARAMS:
        @item_name: unique string for this record.
        @attributes = [
            {'Name': 'duration', 'Value': str(duration), 'Replace': True},
            {'Name': 'date', 'Value': str(date), 'Replace': True},
        ]
        """
        try:
            status = sdb.put_attributes(
                DomainName=domain_name,
                ItemName=str(item_name),
                Attributes=[{'Name':str(name), 'Value':str(value), 'Replace': True}]
            )
        except Exception as e:
            print("SDB put_attribute error:",str(e),'domain_name',domain_name,'item_name',item_name)
            status = False
        try:
            if status['ResponseMetadata']['HTTPStatusCode'] == 200:
                return True
            else:
                print("SDB log error:",status['ResponseMetadata']['HTTPStatusCode'])
                return False
        except:
            print("SDB status structure error, status:",str(status))
            return False


Rayan  CHIKHI's avatar
Rayan CHIKHI committed
222
223
if __name__ == '__main__':
   main()