Commit 7fcbc8d5 authored by Rayan  CHIKHI's avatar Rayan CHIKHI
Browse files

added a fastp+minia pipeline for aws batch

parent 82967f8d
# Construction of unitigs using BCALM on AWS Batch
# Construction of [unitigs/contigs] using [BCALM/Minia] on AWS Batch
### Source
......@@ -15,11 +15,13 @@ Provided CloudFormation template has all the services (VPC, Batch *managed*, IAM
Execute the below commands to spin up the infrastructure cloudformation stack.
```
./deploy-docker.sh
./create_tools_bucket.sh
./deploy-docker.sh [-assembly]
./create_tools_bucket.sh
./spinup.sh
```
Use the `-assembly` flag to deploy the `fastp+Miniak31` Dockerfile instead of BCALM.
### Testing
1. AWS S3 bucket - aws-unitigs-<YOUR_ACCOUNT_NUMBER> is created as part of the stack.
......
#!/bin/bash
cd src
# use with parameter "-assembly" to deploy the fastp+Minia Dockerfile instead of BCALM
cd src$1
ACCOUNT=$(aws sts get-caller-identity --query Account --output text) # AWS ACCOUNT ID
DOCKER_CONTAINER=aws-batch-s3-unitigs-job
REPO=${ACCOUNT}.dkr.ecr.us-east-1.amazonaws.com/${DOCKER_CONTAINER}
......
FROM python
# https://pythonspeed.com/articles/alpine-docker-python/
WORKDIR /
COPY batch_processor.py .
RUN pip install --upgrade pip && \
pip install boto3 awscli
# local AWS credentials
ARG AWS_DEFAULT_REGION
#ENV AWS_DEFAULT_REGION=$AWS_DEFAULT_REGION
ARG AWS_ACCESS_KEY_ID
#ENV AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY
#ENV AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY
# fastp install
RUN curl -L -o fastp http://opengene.org/fastp/fastp
RUN chmod + fastp
# Minia install from binaries
RUN curl -L -o minia-v3.2.3-bin-Linux.tar.gz https://github.com/GATB/minia/releases/download/v3.2.3/minia-v3.2.3-bin-Linux.tar.gz
RUN tar xf minia-v3.2.3-bin-Linux.tar.gz && rm minia-v3.2.3-bin-Linux.tar.gz
RUN mv minia-v3.2.3-bin-Linux/bin/minia ./
# MFcompress
RUN aws s3 cp s3://aws-unitigs-tools/MFCompress-linux64-1.01.tgz .
RUN tar xf MFCompress-linux64-1.01.tgz && rm MFCompress-linux64-1.01.tgz
RUN mv MFCompress-linux64-1.01/MFCompressC ./
RUN pwd
RUN df -h .
RUN ls
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
import sys
from operator import itemgetter, attrgetter
from time import sleep
import urllib3
import json
LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'
def process_file(inputBucket, fileName, region, delete_original=False):
#try:
if True:
urllib3.disable_warnings()
s3 = boto3.client('s3')
prefix = fileName
print("region - " + region)
startTime = datetime.now()
# go to /tmp (important, that's where local storage / nvme is)
os.chdir("/tmp")
# check free space
os.system(' '.join(["df", "-h", "."]))
# download reads from s3
local_file = str(fileName)
s3.download_file(inputBucket, fileName, local_file)
print("downloaded file to",local_file)
# run fastp
os.system(' '.join(["../fastp", "--trim_poly_x", "-i", local_file, "-o", "filtered.fastq"]))
# remove orig reads to free up space
os.system(' '.join(["rm", local_file]))
# run minia
os.system(' '.join(["../minia", "-kmer-size", "31", "-in", "filtered.fastq"]))
contigs_filename = '.'.join(local_file.split('.')[:-1])+".contigs.fa"
# run mfc
os.system(' '.join(["../MFCompressC",contigs_filename]))
compressed_contigs_filename = contigs_filename + ".mfc"
# upload contigs to s3
outputBucket = inputBucket
s3.upload_file(compressed_contigs_filename, outputBucket, compressed_contigs_filename)
# delete original file, maybe
if delete_original:
logMessage(fileName, "Deleting original file", LOGTYPE_INFO)
s3.delete_object(Bucket = inputBucket, Key = fileName)
endTime = datetime.now()
diffTime = endTime - startTime
logMessage(fileName, "File processing time - " + str(diffTime.seconds), LOGTYPE_INFO)
#except Exception as ex:
# logMessage(fileName, "Error processing files:" + str(ex), LOGTYPE_ERROR)
def main():
inputBucket = ""
fileName = ""
region = "us-east-1"
delete_original = False
if "InputBucket" in os.environ:
inputBucket = os.environ.get("InputBucket")
if "FileName" in os.environ:
fileName = os.environ.get("FileName")
if "Region" in os.environ:
region = os.environ.get("Region")
if "DeleteOriginal" in os.environ:
delete_original = True
try:
if inputBucket == "" or fileName == "":
parser = argparse.ArgumentParser()
parser.add_argument("--bucketName", "-b", type=str, required=True)
parser.add_argument("--fileName", "-f", type=str, required=True)
parser.add_argument("--region", "-r", type=str, required=True)
parser.add_argument("--delete-original", "-d", dest='delete_original', action='store_true')
args = parser.parse_args()
inputBucket = args.bucketName
fileName = args.fileName
region = args.region
delete_original = args.delete_original
except Exception as ex:
logMessage(fileName, "Unexpected error during arg parsing (due to lack of environment variables):" + str(ex), LOGTYPE_ERROR)
logMessage(fileName, 'parameters: ' + inputBucket + " " + fileName + " " + region, LOGTYPE_INFO)
process_file(inputBucket, fileName, region, delete_original)
def logMessage(fileName, message, logType):
try:
logMessageDetails = constructMessageFormat(fileName, message, "", logType)
if logType == "INFO" or logType == "ERROR":
print(logMessageDetails)
elif logType == "DEBUG":
try:
if os.environ.get('DEBUG') == "LOGTYPE":
print(logMessageDetails)
except KeyError:
pass
except Exception as ex:
logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
print(logMessageDetails)
def constructMessageFormat(fileName, message, additionalErrorDetails, logType):
if additionalErrorDetails != "":
return "fileName: " + fileName + " " + logType + ": " + message + " Additional Details - " + additionalErrorDetails
else:
return "fileName: " + fileName + " " + logType + ": " + message
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment