batch_processor.py 8.2 KB
Newer Older
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
1
2
3
4
5
6
7
8
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
from time import sleep
import urllib3
9
import glob
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
10
11
12
13
14

LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'

15
16
17
def fastp(accession,folder,sdb,extra_args=""):
    # Separate fastq files into paired- and single-end reads
    #based on https://github.com/hawkjo/sequencetools/blob/master/clean_fastqs_in_subdirs.py
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
18
    fastq_files = glob.glob(folder+"/"+accession+"*.fastq")
19
20
21
22
23
24
    pe_fastq_files = []
    se_fastq_files = []
    for fname in fastq_files:
        if fname[-8:] == '_1.fastq':
            # If first of pair is found, add both
            fname2 = fname[:-8] + '_2.fastq'
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
25
26
27
28
29
30
            if fname2 not in fastq_files:
                print('Unpaired _1 file: ' + fname)
                sdb_log(sdb,accession,'read_unpaired1','True')
                se_fastq_files.append(fname)
            else:
                pe_fastq_files.append((fname, fname2))
31
32
33
        elif fname[-8:] == '_2.fastq':
            # If second of pair is found, test for presence of first, but do nothing
            fname1 = fname[:-8] + '_1.fastq'
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
34
35
36
37
            if fname1 not in fastq_files:
                print('Unpaired _2 file: ' + fname)
                sdb_log(sdb,accession,'read_unpaired2','True')
                se_fastq_files.append(fname)
38
39
40
41
42
43
44
45
46
47
        else:
            se_fastq_files.append(fname)

    os.system('rm -f ' + accession + '.fastq') # should be unnecessary
    for f1, f2 in pe_fastq_files:
        os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f1, "-I", f2, "--stdout", extra_args, ">>", accession + ".fastq"]))
    for f in se_fastq_files:
        os.system(' '.join(["/fastp", "--thread", "4", "--trim_poly_x", "-i", f,            "--stdout", extra_args, ">>", accession + ".fastq"]))
 

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
48
49
50
51
52
def process_file(accession, region):
    #try:
    if True:
        urllib3.disable_warnings()
        s3 = boto3.client('s3')
53
        sdb = boto3.client('sdb', region_name=region)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
54
        
55
        print("region - " + region, flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
56
57
        startTime = datetime.now()

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
58
59
        os.system(' '.join(["df", "-T"]))
        os.system(' '.join(["lsblk"]))
60
61
62
63
        os.system(' '.join(["echo","contents of /root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["cat","/root/.ncbi/user-settings.mkfg"]))
        os.system(' '.join(["echo","EOF"]))
        
64
        # go to /tmp (that's where local storage / nvme is)
65
66
        # on second thought..
        # go to EBS instead
67
        os.chdir("/mnt/serratus-data")
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
68
        os.system(' '.join(["pwd"]))
69

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
70
        # check free space
71
        os.system(' '.join(["df", "-h","."]))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
72

73
        # some debug
74
        print("at start, dir listing of /mnt/serratus-data", flush=True)
Rayan  CHIKHI's avatar
tweaks    
Rayan CHIKHI committed
75
        os.system(' '.join(['ls','-alR','.']))
76

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
77
78
        # download reads from accession
        os.system('mkdir -p out/')
79
        prefetch_start = datetime.now()
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
80
        os.system('prefetch '+accession)
81
82
        prefetch_time = datetime.now() - prefetch_start 
        sdb_log(sdb,accession,'prefetch_time',int(prefetch_time.seconds))
83
        os.system('mkdir -p tmp/')
84
        pfqdump_start = datetime.now()
85
        os.system('/parallel-fastq-dump --split-files --outdir out/ --tmpdir tmp/ --threads 4 --sra-id '+accession)
86
87
88
        pfqdump_time = datetime.now() - pfqdump_start
        sdb_log(sdb,accession,'pfqdump_time',int(pfqdump_time.seconds))

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
89
90

        files = os.listdir(os.getcwd() + "/out/")
91
        print("after fastq-dump, dir listing of out/", files)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
92
93
94
        inputDataFn = accession+".inputdata.txt"
        g = open(inputDataFn,"w")
        for f in files:
95
            print("file: " + f + " size: " + str(os.stat("out/"+f).st_size), flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
96
97
98
99
100
101
102
            g.write(f + " " + str(os.stat("out/"+f).st_size)+"\n")
        g.close()

        # potential todo: there is opportunity to use mkfifo and speed-up parallel-fastq-dump -> bbduk step
        # as per https://github.com/ababaian/serratus/blob/master/containers/serratus-dl/run_dl-sra.sh#L26

        # run fastp
103
104
105
        fastp_start = datetime.now()
        fastp(accession,"out/", sdb)

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
106
        if os.stat(accession+".fastq").st_size == 0:
107
108
109
110
111
112
113
114
            # fastp output is empty, most likely those reads have dummy quality values. retry.
            print("retrying fastp without quality filtering", flush=true)
            sdb_log(sdb,accession,'fastp_noqual','True')
            fastp(accession,"out/",sdb,"--disable_quality_filtering")
           
        if os.stat(accession+".fastq").st_size == 0:
            print("fastp produced empty output even without quality filtering", flush=True)
            sdb_log(sdb,accession,'fastp_empty','True')
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
115
            exit(1)
116
117

        print("fastp done, now uploading to S3")
118
119
        fastp_time = datetime.now() - fastp_start 
        sdb_log(sdb,accession,'fastp_time',int(fastp_time.seconds))
120

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
121
122
        # upload filtered reads to s3
        outputBucket = "serratus-rayan"
123
        upload_start = datetime.now()
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
124
        s3.upload_file(accession+".fastq", outputBucket, "reads/"+accession+".fastq")
125
126
        upload_time = datetime.now() - upload_start
        sdb_log(sdb,accession,'upload_time',int(upload_time.seconds))
127
       
128
        # cleanup. #(important when using a local drive)
129
130
        os.system(' '.join(["rm","-f","out/"+accession+"*.fastq"]))
        os.system(' '.join(["rm","-f",accession+".fastq"]))
131
        os.system(' '.join(["rm","-f","public/sra/"+accession+".sra"]))
132

Rayan  CHIKHI's avatar
Rayan CHIKHI committed
133
134
        endTime = datetime.now()
        diffTime = endTime - startTime
135
        sdb_log(sdb,accession,'batch_dl_time',int(diffTime.seconds))
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
        logMessage(accession, "Serratus-batch-dl processing time - " + str(diffTime.seconds), LOGTYPE_INFO) 


def main():
    accession = ""
    region = "us-east-1"
   
    if "Accession" in os.environ:
        accession = os.environ.get("Accession")
    if "Region" in os.environ:
        region = os.environ.get("Region")

    if len(accession) == 0:
        exit("This script needs an environment variable Accession set to something")

    logMessage(accession, 'parameters: ' + accession+  "  " + region, LOGTYPE_INFO)

    process_file(accession,region)


def logMessage(fileName, message, logType):
    try:
        logMessageDetails = constructMessageFormat(fileName, message, "", logType)
        
        if logType == "INFO" or logType == "ERROR":
161
            print(logMessageDetails, flush=True)
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
162
163
164
        elif logType == "DEBUG":
            try:
                if os.environ.get('DEBUG') == "LOGTYPE":
165
                   print(logMessageDetails, flush=True) 
Rayan  CHIKHI's avatar
Rayan CHIKHI committed
166
167
168
169
170
171
172
173
174
175
176
177
178
            except KeyError:
                pass
    except Exception as ex:
        logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
        print(logMessageDetails)


def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
    if additionalErrorDetails != "":
        return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details -  " + additionalErrorDetails
    else:
        return "fileName: " + fileName + " " + logType + ": " + message

179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def sdb_log(
        sdb, item_name, name, value,
        region='us-east-1', domain_name='serratus-batch',
    ):
        """
        Insert a single record to simpledb domain.
        PARAMS:
        @item_name: unique string for this record.
        @attributes = [
            {'Name': 'duration', 'Value': str(duration), 'Replace': True},
            {'Name': 'date', 'Value': str(date), 'Replace': True},
        ]
        """
        try:
            status = sdb.put_attributes(
                DomainName=domain_name,
                ItemName=str(item_name),
                Attributes=[{'Name':str(name), 'Value':str(value), 'Replace': True}]
            )
        except Exception as e:
            print("SDB put_attribute error:",str(e),'domain_name',domain_name,'item_name',item_name)
            status = False
        try:
            if status['ResponseMetadata']['HTTPStatusCode'] == 200:
                return True
            else:
                print("SDB log error:",status['ResponseMetadata']['HTTPStatusCode'])
                return False
        except:
            print("SDB status structure error, status:",str(status))
            return False


Rayan  CHIKHI's avatar
Rayan CHIKHI committed
212
213
if __name__ == '__main__':
   main()