Commit fa058937 authored by Shiva Ramani's avatar Shiva Ramani
Browse files

initial commit

parent 9ab9ba6d
## My Project
TODO: Fill this README out!
# Orchestrating an Application Process with AWS Batch using AWS CloudFormation
Be sure to:
The sample provided spins up an application orchestration using AWS Services like AWS Simple Storage Service (S3), AWS Lambda and AWS DynamoDB. Amazon Elastic Container Registry (ECR) is used as the Docker container registry. Once the CloudFormation stack is spun up, the downloaded code can be checked in into your AWS CodeCommit repository (built as part of the stack) which would trigger the build to deploy the image to Amazon ECR. AWS Batch will be triggered by the lambda when a sample CSV file is dropped into the S3 bucket.
* Change the title in this README
* Edit your repository description on GitHub
**As part of this blog we will do the following.**
1. Run the CloudFormation template (command provided) to create the necessary infrastructure
2. Set up the Docker image for the job
- Build a Docker image
- Tag the build and push the image to the repository
3. Drop the CSV into the S3 bucket (Copy paste the contents and create them as a sample file (“Sample.csv”)
4. Notice the Job runs and performs the operation based on the pushed container image. The job parses the CSV file and adds each row into DynamoDB.
![Alt text](Orchestrating%20an%20application%20process%20with%20AWS%20Batch.png?raw=true "Title")
### Design Considerations
1. Provided CloudFormation template has all the services (refer diagram below) needed for this exercise in one single template. In a production scenario, you may ideally want to split them into different templates (nested stacks) for easier maintenance.
2. Lambda uses Batch Jobs’ JobDefinition, JobQueue - Version as parameters. Once the Cloudformation stack is complete, this can be passed as input parameters and set as environment variables for the Lambda. Otherwise, When you deploy subsequent version of the jobs, you may need to manually change the queue definition:version.
3. Below example lets you build, tag, pushes the docker image to the repository (created as part of the stack). Optionally this can be done with the AWS CodeBuild building from the repository and shall push the image to AWS ECR.
### Steps
1. Git clone/fork the this repository - We will refer this as SOURCE_REPOSITORY
'''
$ git clone https://<url>//batch-processing-job-repo
'''
2. Execute the below command to spin up the infrastructure cloudformation stack. This stack spins up all the necessary AWS infrastructure needed for this exercise
```
$ cd batch-processing-job-repo
$ aws cloudformation create-stack --stack-name batch-processing-job --template-body file://template/template.yaml --capabilities CAPABILITY_NAMED_IAM
```
3. You can run the application in two different ways
a. CI/CD implementation.
- # This steps allows you to copy the contents from source git repo and trigger deployment into your repository
- The above command would have created a git repository in your personal account
- $ git clone <URL>
- cd batch-processing-job-repo
- copy all the contents from SOURCE_REPOSITORY (from step 1) and paste inside this folder
- $ git add .
- $ git commit -m "commit from source"
- $ git push
b. Build and run from local desktop. Containarize the provided python file and push it to the Amazon ECR. Dockerfile provided as part of this exercise.
RUN the below commands to dockerize the python file
- Make sure to replace your account number, region accordingly
- Make sure to have Docker daemon running in your local computer
```
$ cd SOURCE_REPOSITORY (Refer step 1)
$ cd src
# get the login creds and copy the below output and paste/run on the command line
$ $ aws ecr get-login --region us-east-1 --no-include-email
# Build the docker image locally, tag and push it to the repository
$ $ docker build -t batch_processor .
$ docker tag batch_processor <YOUR_ACCOUNT_NUMBER>.dkr.ecr.us-east-1.amazonaws.com/batch-processing-job-repository
$ docker push <YOUR_ACCOUNT_NUMBER>.dkr.ecr.us-east-1.amazonaws.com/batch-processing-job-repository
```
### Testing
Make sure to complete the above step. You can review the image in AWS Console > ECR - "batch-processing-job-repository" repository
1. AWS S3 bucket - batch-processing-job-<YOUR_ACCOUNT_NUMBER> is created as part of the stack.
2. Drop the provided Sample.CSV into the S3 bucket. This will trigger the Lambda to trigger the AWS Batch
3. In AWS Console > Batch, Notice the Job runs and performs the operation based on the pushed container image. The job parses the CSV file and adds each row into DynamoDB.
4. In AWS Console > DynamoDB, look for "batch-processing-job" table. Note sample products provided as part of the CSV is added by the batch
### Code Cleanup
1. AWS Console > S3 bucket - batch-processing-job-<YOUR_ACCOUNT_NUMBER> - Delete the contents of the file
2. AWS Console > ECR - batch-processing-job-repository - delete the image(s) that are pushed to the repository
3. run the below command to delete the stack.
```
$ aws cloudformation delete-stack --stack-name batch-processing-job
```
## License
This library is licensed under the MIT-0 License. See the LICENSE file.
version: 0.2
phases:
install:
runtime-versions:
python: 3.7
pre_build:
commands:
- aws --version
# Cloudformation will set this application environment variables for CodeBuild
# REPOSITORY_URI=<youraccountnumber>.dkr.ecr.<region>.amazonaws.com/todo-repository
# AWS_DEFAULT_REGION=region ex: us-east-1
- echo 'region - ' - $AWS_DEFAULT_REGION
- echo 'repository - ' $REPOSITORY_URI
- cd src/
- echo Logging in to Amazon ECR
- $(aws ecr get-login --region $AWS_DEFAULT_REGION --no-include-email)
build:
commands:
- echo Build started on `date`
- echo Building the Docker image...
- docker build -t $REPOSITORY_URI .
- docker tag $REPOSITORY_URI $REPOSITORY_URI
post_build:
commands:
- echo Build completed on `date`
- echo Push the latest Docker Image...
- docker push $REPOSITORY_URI
\ No newline at end of file
ProductId,ProductName,ProductDescription
P00001,Phone,Traditional Voice Phone
P00002,Internet,Traditional Copper Internet
P00003,Television,Free To Air
P00004,Phone,Digital Voice
P00005,Internet,Fiber Internet
P00006,Television,Streaming TV Channel
\ No newline at end of file
FROM python
COPY batch_processor.py /
RUN pip install --upgrade pip && \
pip install boto3 && \
pip install boto
RUN pwd
RUN ls
CMD ["python", "batch_processor.py"]
\ No newline at end of file
import boto3
from boto3.dynamodb.conditions import Key, Attr
import csv, sys, time, argparse
from datetime import datetime
import json
import os
import sys
from operator import itemgetter, attrgetter
from time import sleep
import urllib3
import json
DB_TABLE = 'batch-processing-job'
LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'
# table - batch_processing_job
def batch_processing_job_update(productId, productName, productDescription, fileName, region):
try:
now = datetime.now()
date_time = now.strftime("%m-%d-%Y %H:%M:%S.%f")[:-3]
dynamodb = boto3.resource('dynamodb', region_name = region)
table = dynamodb.Table(DB_TABLE)
table.put_item(
Item={
'ProductId': productId,
'Filename': fileName,
'CreatedTime': date_time,
'LastModified': date_time,
'ProductName': productName,
'ProductDescription': productDescription
}
)
except Exception as ex:
logMessage(fileName, "Error Updating DynamoDB:" + str(ex), LOGTYPE_ERROR)
def read_file(fileName, inputBucket, inputFile, s3):
input_products = []
logMessage(fileName, 'Reading file - ' + inputBucket + "/" + inputFile, LOGTYPE_INFO)
productId = ""
productName = ""
productDescription = ""
try:
s3_object = s3.Object(inputBucket, inputFile).get()[u'Body'].read().decode('utf-8')
input_lines = s3_object.splitlines()
productIndex = 0
for row in csv.DictReader(input_lines, delimiter='\t'):
try:
print('row - ' + str(row))
eachRow = row['ProductId,ProductName,ProductDescription'].split(',')
productId = eachRow[0]
productName = eachRow[1]
productDescription = eachRow[2]
except Exception as ex:
logMessage(fileName, "Error retrieving Product " + str(ex), LOGTYPE_DEBUG)
productIndex = productIndex + 1
input_products.append({'productIndex': productIndex, 'productId': productId, 'productName': productName, 'productDescription': productDescription})
input_products = sorted(input_products, key=itemgetter('productIndex'))
except Exception as ex:
logMessage(fileName, "Error parsing excel " + str(ex), LOGTYPE_ERROR)
return input_products
def batch_process(input_products, fileName, region):
try:
for source_products in input_products:
productIndex = source_products['productIndex']
productId = str(source_products['productId'])
productName = str(source_products['productName'])
productDescription = str(source_products['productDescription'])
print(str(productIndex) +" " + productId + " " + productName + " " + productDescription + " " + fileName)
batch_processing_job_update(productId, productName, productDescription, fileName, region)
logMessage(fileName, "Product updated for " + productId + " and " + productName + " with "+ productDescription, LOGTYPE_INFO)
except Exception as ex:
logMessage(fileName, "Error batch processing files" + str(ex), LOGTYPE_ERROR)
def process_files(inputBucket, fileName, region):
try:
urllib3.disable_warnings()
s3 = boto3.resource('s3', verify=False)
prefix = fileName
print("region - " + region)
bucket = s3.Bucket(name=inputBucket)
FilesNotFound = True
startTime = datetime.now()
for files in bucket.objects.filter(Prefix=prefix):
logMessage(fileName, 'files.key-' + files.key, LOGTYPE_DEBUG)
isCSVFile = files.key.endswith(".csv")
if isCSVFile:
FilesNotFound = False
input_products = read_file(fileName, inputBucket, files.key, s3)
if len(input_products) > 0:
batch_process(input_products, fileName, region)
else:
logMessage(fileName, "No products could be found in bucket {}/{}".format(inputBucket, prefix), LOGTYPE_INFO)
if FilesNotFound:
logMessage(fileName, "No file in {0}/{1}".format(bucket, prefix), LOGTYPE_INFO)
endTime = datetime.now()
diffTime = endTime - startTime
logMessage(fileName, "Total processing time - " + str(diffTime.seconds), LOGTYPE_INFO)
except Exception as ex:
logMessage(fileName, "Error processing files:" + str(ex), LOGTYPE_ERROR)
def main():
startTime = datetime.now()
inputBucket = ""
fileName = ""
region = "us-west-2"
try:
inputBucket = os.environ.get("InputBucket")
fileName = os.environ.get("FileName")
region = os.environ.get("Region")
logMessage(fileName, 'received ' + inputBucket + " " + fileName + " from environment", LOGTYPE_INFO)
except:
error = ''
try:
if inputBucket == "" and fileName == "":
parser = argparse.ArgumentParser()
parser.add_argument("--bucketName", "-js", type=str, required=True)
parser.add_argument("--fileName", "-js", type=str, required=True)
parser.add_argument("--region", "-js", type=str, required=True)
args = parser.parse_args()
inputJson = args.bucketName
fileName = args.fileName
region = args.region
logMessage(fileName, 'received ' + inputBucket + " " + fileName + " " + region + " from params", LOGTYPE_INFO)
except Exception as ex:
logMessage(fileName, "Unexpected error:" + str(ex), LOGTYPE_ERROR)
process_files(inputBucket, fileName, region)
endTime = datetime.now()
diffTime = endTime - startTime
logMessage(fileName, "Total processing time - " + str(diffTime.seconds), LOGTYPE_INFO)
def logMessage(productId, message, logType):
try:
logMessageDetails = constructMessageFormat(productId, message, "", logType)
if logType == "INFO" or logType == "ERROR":
print(logMessageDetails)
elif logType == "DEBUG":
try:
if os.environ.get('DEBUG') == "LOGTYPE":
print(logMessageDetails)
except KeyError:
pass
except Exception as ex:
logMessageDetails = constructMessageFormat(productId, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
print(logMessageDetails)
def constructMessageFormat(productId, message, additionalErrorDetails, logType):
if additionalErrorDetails != "":
return "ProductId: " + productId + " " + logType + ": " + message + " Additional Details - " + additionalErrorDetails
else:
return "ProductId: " + productId + " " + logType + ": " + message
def test():
print("just a test message")
# COMMENT below for local/development testing
# main()
print("Python file invoked")
if __name__ == '__main__':
main()
\ No newline at end of file
---
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Orchestrating an Application Process with AWS Batch using CloudFormation'
Resources:
VPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: 10.0.0.0/16
InternetGateway:
Type: AWS::EC2::InternetGateway
RouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId:
Ref: VPC
VPCGatewayAttachment:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
VpcId:
Ref: VPC
InternetGatewayId:
Ref: InternetGateway
SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: EC2 Security Group for instances launched in the VPC by Batch
VpcId:
Ref: VPC
Subnet:
Type: AWS::EC2::Subnet
Properties:
CidrBlock: 10.0.0.0/24
VpcId:
Ref: VPC
MapPublicIpOnLaunch: 'True'
Route:
Type: AWS::EC2::Route
Properties:
RouteTableId:
Ref: RouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId:
Ref: InternetGateway
SubnetRouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId:
Ref: RouteTable
SubnetId:
Ref: Subnet
LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
RoleName:
Fn::Sub: lambda-role
AssumeRolePolicyDocument:
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Version: 2012-10-17
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
- arn:aws:iam::aws:policy/AmazonKinesisFullAccess
- arn:aws:iam::aws:policy/AWSBatchFullAccess
- arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
Path: /
BatchServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: batch.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
IamInstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
Roles:
- Ref: EcsInstanceRole
EcsInstanceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2008-10-17'
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: ec2.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
BatchProcessingJobDefinition:
Type: AWS::Batch::JobDefinition
Properties:
Type: container
JobDefinitionName: BatchJobDefinition
ContainerProperties:
Image:
Fn::Join:
- ''
- - Ref: AWS::AccountId
- .dkr.ecr.
- Ref: AWS::Region
- ".amazonaws.com/batch-processing-job-repository:latest"
Vcpus: 2
Memory: 2000
Command:
- python
- batch_processor.py
RetryStrategy:
Attempts: 1
BatchProcessingJobQueue:
Type: AWS::Batch::JobQueue
Properties:
JobQueueName: BatchProcessingJobQueue
Priority: 1
ComputeEnvironmentOrder:
- Order: 1
ComputeEnvironment:
Ref: ComputeEnvironment
ComputeEnvironment:
Type: AWS::Batch::ComputeEnvironment
Properties:
Type: MANAGED
ComputeResources:
Type: EC2
MinvCpus: 0
DesiredvCpus: 0
MaxvCpus: 32
InstanceTypes:
#- a1.medium
- optimal
Subnets:
- Ref: Subnet
SecurityGroupIds:
- Ref: SecurityGroup
InstanceRole:
Ref: IamInstanceProfile
ServiceRole:
Ref: BatchServiceRole
BatchProcessS3Bucket:
Type: AWS::S3::Bucket
DependsOn: BatchProcessBucketPermission
Properties:
BucketName:
!Sub 'batch-processing-job-${AWS::AccountId}'
NotificationConfiguration:
LambdaConfigurations:
- Event: 's3:ObjectCreated:*'
Function: !GetAtt BatchProcessingLambdaInvokeFunction.Arn
BatchProcessBucketPermission:
Type: AWS::Lambda::Permission
Properties:
Action: 'lambda:InvokeFunction'
FunctionName: !Ref BatchProcessingLambdaInvokeFunction
Principal: s3.amazonaws.com
SourceAccount: !Ref "AWS::AccountId"
SourceArn: !Sub "arn:aws:s3:::batch-processing-job-${AWS::AccountId}"
BatchProcessingLambdaInvokeFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: BatchProcessingLambdaInvokeFunction
Description: Python Function Handler that would be triggered BY s3 events TO the aws batch
Handler: index.lambda_handler
Runtime: python3.6
MemorySize: 128
Timeout: 30
Role:
Fn::GetAtt:
- LambdaExecutionRole
- Arn
Code:
ZipFile: |
import json
import boto3
def lambda_handler(event, context):
inputFileName = ""
bucketName = ""
for record in event['Records']:
bucketName = record['s3']['bucket']['name']
inputFileName = record['s3']['object']['key']
response = {
'statusCode': 200,
'body': json.dumps('Input Received - ' + json.dumps(event))
}
batch = boto3.client('batch')
region = batch.meta.region_name
batchCommand = "--bucketName " + bucketName + " --fileName " + inputFileName + " --region " + region
out = "inputFileName - " + bucketName + "/" + inputFileName + " Region " + region
out = out + " " + batchCommand
print(out)
response = batch.submit_job(jobName='BatchProcessingJobQueue',
jobQueue='BatchProcessingJobQueue',
jobDefinition='BatchJobDefinition',
containerOverrides={
"command": [ "python", "batch_processor.py", batchCommand ],
"environment": [
{"name": "InputBucket", "value": bucketName},
{"name": "FileName", "value": inputFileName},
{"name": "Region", "value": region}
]
})
print("Job ID is {}.".format(response['jobId']))
return response
#Code Commit
CodeCommitRepository:
Type: AWS::CodeCommit::Repository
Properties:
RepositoryName: batch-processing-job-repo
RepositoryDescription: Respository to maintain code related to the Batch Processing Jobs.
#Code Build
CodeBuildProject:
Type: AWS::CodeBuild::Project
Properties:
Name: batch-processing-job-build
Description: Todo Api application codebuild project.
ServiceRole: !GetAtt CodeBuildRole.Arn
Artifacts:
Type: no_artifacts
Environment:
Type: LINUX_CONTAINER
ComputeType: BUILD_GENERAL1_SMALL
Image: aws/codebuild/amazonlinux2-x86_64-standard:1.0
PrivilegedMode: true
EnvironmentVariables:
- Name: REPOSITORY_URI
Type: PLAINTEXT
Value:
Fn::Join:
- ''
- - Ref: AWS::AccountId
- .dkr.ecr.
- Ref: AWS::Region
- ".amazonaws.com/batch-processing-job-repository:latest"
- Name: AWS_DEFAULT_REGION
Type: PLAINTEXT
Value:
Ref: AWS::Region
Source:
BuildSpec: config/buildspec.yml
Location:
Fn::Join:
- ''
- - 'https://git-codecommit.'
- Ref: AWS::Region
- '.amazonaws.com/v1/repos/'
- batch-processing-job-repo
Type: CODECOMMIT
SourceVersion: refs/heads/master
TimeoutInMinutes: 10
CodeBuildRole