Commit d7963a98 authored by Rayan  CHIKHI's avatar Rayan CHIKHI
Browse files

everything except unitigs

parent 970ef160
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2020 Rayan Chikhi
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
......
# Orchestrating an Application Process with AWS Batch using AWS CloudFormation
# Taken from: "Orchestrating an Application Process with AWS Batch using AWS CloudFormation"
The sample provided spins up an application orchestration using AWS Services like AWS Simple Storage Service (S3), AWS Lambda and AWS DynamoDB. Amazon Elastic Container Registry (ECR) is used as the Docker container registry. Once the CloudFormation stack is spun up, the downloaded code can be checked in into your AWS CodeCommit repository (built as part of the stack) which would trigger the build to deploy the image to Amazon ECR. AWS Batch will be triggered by the lambda when a sample CSV file is dropped into the S3 bucket.
But I removed all the CodeCommit stuff (replaced by `deploy-docker.sh` manual deployment to ECR).
**As part of this blog we will do the following.**
1. Run the CloudFormation template (command provided) to create the necessary infrastructure
2. Set up the Docker image for the job
- Build a Docker image
- Tag the build and push the image to the repository
3. Drop the CSV into the S3 bucket (Copy paste the contents and create them as a sample file (“Sample.csv”)
4. Notice the Job runs and performs the operation based on the pushed container image. The job parses the CSV file and adds each row into DynamoDB.
![Alt text](Orchestrating%20an%20application%20process%20with%20AWS%20Batch.png?raw=true "Title")
Amazon Elastic Container Registry (ECR) is used as the Docker container registry. AWS Batch will be triggered by the lambda when a dataset file is dropped into the S3 bucket.
### Design Considerations
......@@ -27,70 +15,32 @@ The sample provided spins up an application orchestration using AWS Services lik
### Steps
1. Download this repository - We will refer this as SOURCE_REPOSITORY
1. Execute the below commands to spin up the infrastructure cloudformation stack. This stack spins up all the necessary AWS infrastructure needed for this exercise
```
$ git clone https://github.com/aws-samples/aws-batch-processing-job-repo
./deploy-docker.sh [need to modify Amazon ID in it]
./spinup.sh
```
2. Execute the below commands to spin up the infrastructure cloudformation stack. This stack spins up all the necessary AWS infrastructure needed for this exercise
```
$ cd aws-batch-processing-job-repo
$ aws cloudformation create-stack --stack-name batch-processing-job --template-body file://template/template.yaml --capabilities CAPABILITY_NAMED_IAM
```
3. You can run the application in two different ways
* #### CI/CD implementation.
##### This steps allows you to copy the contents from source git repo and trigger deployment into your repository
* The above command would have created a git repository in your personal account. Make sure to replace your region below accordingly
* $ git clone https://git-codecommit.us-east-1.amazonaws.com/v1/repos/batch-processing-job-repo
* cd batch-processing-job-repo
* copy all the contents from SOURCE_REPOSITORY (from step 1) and paste inside this folder
* $ git add .
* $ git commit -m "commit from source"
* $ git push
* #### Build and run from local desktop.
##### Containarize the provided python file and push it to the Amazon ECR. Dockerfile provided as part of this exercise. In this steps you push the code (provided as part of this exercise) to the repository that was built as part of CloudFormation stack
* RUN the below commands to dockerize the python file
i. Make sure to replace your account number, region accordingly
ii. Make sure to have Docker daemon running in your local computer
```
$ cd SOURCE_REPOSITORY (Refer step 1)
$ cd src
# get the login creds and copy the below output and paste/run on the command line
$ $ aws ecr get-login --region us-east-1 --no-include-email
# Build the docker image locally, tag and push it to the repository
$ $ docker build -t batch_processor .
$ docker tag batch_processor <YOUR_ACCOUNT_NUMBER>.dkr.ecr.us-east-1.amazonaws.com/batch-processing-job-repository
$ docker push <YOUR_ACCOUNT_NUMBER>.dkr.ecr.us-east-1.amazonaws.com/batch-processing-job-repository
```
### Testing
Make sure to complete the above step. You can review the image in AWS Console > ECR - "batch-processing-job-repository" repository
1. AWS S3 bucket - batch-processing-job-<YOUR_ACCOUNT_NUMBER> is created as part of the stack.
2. Drop the provided Sample.CSV into the S3 bucket. This will trigger the Lambda to trigger the AWS Batch
1. AWS S3 bucket - aws-unitigs-<YOUR_ACCOUNT_NUMBER> is created as part of the stack.
2. Drop a dataset in it This will trigger the Lambda to trigger the AWS Batch
3. In AWS Console > Batch, Notice the Job runs and performs the operation based on the pushed container image. The job parses the CSV file and adds each row into DynamoDB.
4. In AWS Console > DynamoDB, look for "batch-processing-job" table. Note sample products provided as part of the CSV is added by the batch
### Code Cleanup
1. AWS Console > S3 bucket - batch-processing-job-<YOUR_ACCOUNT_NUMBER> - Delete the contents of the file
In short:
```
./cleanup.sh
```
Which does (except it doesn't do 2. for ECR):
1. AWS Console > S3 bucket - aws-unitigs-<YOUR_ACCOUNT_NUMBER> - Delete the contents of the file
2. AWS Console > ECR - batch-processing-job-repository - delete the image(s) that are pushed to the repository
3. run the below command to delete the stack.
......
version: 0.2
phases:
install:
runtime-versions:
python: 3.7
pre_build:
commands:
- aws --version
# Cloudformation will set this application environment variables for CodeBuild
# REPOSITORY_URI=<youraccountnumber>.dkr.ecr.<region>.amazonaws.com/todo-repository
# AWS_DEFAULT_REGION=region ex: us-east-1
- echo 'region - ' - $AWS_DEFAULT_REGION
- echo 'repository - ' $REPOSITORY_URI
- cd src/
- echo Logging in to Amazon ECR
- $(aws ecr get-login --region $AWS_DEFAULT_REGION --no-include-email)
build:
commands:
- echo Build started on `date`
- echo Building the Docker image...
- docker build -t $REPOSITORY_URI .
- docker tag $REPOSITORY_URI $REPOSITORY_URI
post_build:
commands:
- echo Build completed on `date`
- echo Push the latest Docker Image...
- docker push $REPOSITORY_URI
\ No newline at end of file
ProductId,ProductName,ProductDescription
P00001,Phone,Traditional Voice Phone
P00002,Internet,Traditional Copper Internet
P00003,Television,Free To Air
P00004,Phone,Digital Voice
P00005,Internet,Fiber Internet
P00006,Television,Streaming TV Channel
\ No newline at end of file
FROM python
# https://pythonspeed.com/articles/alpine-docker-python/
COPY batch_processor.py /
......@@ -10,4 +11,4 @@ RUN pip install --upgrade pip && \
RUN pwd
RUN ls
CMD ["python", "batch_processor.py"]
\ No newline at end of file
CMD ["python", "batch_processor.py"]
......@@ -16,154 +16,67 @@ LOGTYPE_ERROR = 'ERROR'
LOGTYPE_INFO = 'INFO'
LOGTYPE_DEBUG = 'DEBUG'
# table - batch_processing_job
def batch_processing_job_update(productId, productName, productDescription, fileName, region):
try:
now = datetime.now()
date_time = now.strftime("%m-%d-%Y %H:%M:%S.%f")[:-3]
dynamodb = boto3.resource('dynamodb', region_name = region)
table = dynamodb.Table(DB_TABLE)
table.put_item(
Item={
'ProductId': productId,
'Filename': fileName,
'CreatedTime': date_time,
'LastModified': date_time,
'ProductName': productName,
'ProductDescription': productDescription
}
)
except Exception as ex:
logMessage(fileName, "Error Updating DynamoDB:" + str(ex), LOGTYPE_ERROR)
def read_file(fileName, inputBucket, inputFile, s3):
input_products = []
logMessage(fileName, 'Reading file - ' + inputBucket + "/" + inputFile, LOGTYPE_INFO)
productId = ""
productName = ""
productDescription = ""
try:
s3_object = s3.Object(inputBucket, inputFile).get()[u'Body'].read().decode('utf-8')
input_lines = s3_object.splitlines()
productIndex = 0
for row in csv.DictReader(input_lines, delimiter='\t'):
try:
print('row - ' + str(row))
eachRow = row['ProductId,ProductName,ProductDescription'].split(',')
productId = eachRow[0]
productName = eachRow[1]
productDescription = eachRow[2]
except Exception as ex:
logMessage(fileName, "Error retrieving Product " + str(ex), LOGTYPE_DEBUG)
productIndex = productIndex + 1
input_products.append({'productIndex': productIndex, 'productId': productId, 'productName': productName, 'productDescription': productDescription})
input_products = sorted(input_products, key=itemgetter('productIndex'))
except Exception as ex:
logMessage(fileName, "Error parsing excel " + str(ex), LOGTYPE_ERROR)
return input_products
def batch_process(input_products, fileName, region):
try:
for source_products in input_products:
productIndex = source_products['productIndex']
productId = str(source_products['productId'])
productName = str(source_products['productName'])
productDescription = str(source_products['productDescription'])
print(str(productIndex) +" " + productId + " " + productName + " " + productDescription + " " + fileName)
batch_processing_job_update(productId, productName, productDescription, fileName, region)
logMessage(fileName, "Product updated for " + productId + " and " + productName + " with "+ productDescription, LOGTYPE_INFO)
except Exception as ex:
logMessage(fileName, "Error batch processing files" + str(ex), LOGTYPE_ERROR)
def process_files(inputBucket, fileName, region):
def process_file(inputBucket, fileName, region):
try:
urllib3.disable_warnings()
s3 = boto3.resource('s3', verify=False)
s3 = boto3.client('s3')
prefix = fileName
print("region - " + region)
bucket = s3.Bucket(name=inputBucket)
FilesNotFound = True
startTime = datetime.now()
# download reads from s3
local_file = "/tmp/" + fileName
s3.download_file(inputBucket, fileName, local_file)
print("downloaded file to",local_file)
for files in bucket.objects.filter(Prefix=prefix):
logMessage(fileName, 'files.key-' + files.key, LOGTYPE_DEBUG)
isCSVFile = files.key.endswith(".csv")
if isCSVFile:
FilesNotFound = False
input_products = read_file(fileName, inputBucket, files.key, s3)
if len(input_products) > 0:
batch_process(input_products, fileName, region)
else:
logMessage(fileName, "No products could be found in bucket {}/{}".format(inputBucket, prefix), LOGTYPE_INFO)
# upload unitigs to s3
newFileName = fileName+".processed"
s3.upload_file(local_file, inputBucket, newFileName)
if FilesNotFound:
logMessage(fileName, "No file in {0}/{1}".format(bucket, prefix), LOGTYPE_INFO)
endTime = datetime.now()
diffTime = endTime - startTime
logMessage(fileName, "Total processing time - " + str(diffTime.seconds), LOGTYPE_INFO)
logMessage(fileName, "File processing time - " + str(diffTime.seconds), LOGTYPE_INFO)
except Exception as ex:
logMessage(fileName, "Error processing files:" + str(ex), LOGTYPE_ERROR)
def main():
startTime = datetime.now()
inputBucket = ""
fileName = ""
region = "us-west-2"
try:
inputBucket = os.environ.get("InputBucket")
region = "us-east-1"
#try:
if "InputBucket" in os.environ:
inputBucket = os.environ.get("InputBucket")
if "FileName" in os.environ:
fileName = os.environ.get("FileName")
if "Region" in os.environ:
region = os.environ.get("Region")
logMessage(fileName, 'received ' + inputBucket + " " + fileName + " from environment", LOGTYPE_INFO)
except:
error = ''
try:
if inputBucket == "" and fileName == "":
if inputBucket == "" or fileName == "":
parser = argparse.ArgumentParser()
parser.add_argument("--bucketName", "-js", type=str, required=True)
parser.add_argument("--fileName", "-js", type=str, required=True)
parser.add_argument("--region", "-js", type=str, required=True)
parser.add_argument("--bucketName", "-b", type=str, required=True)
parser.add_argument("--fileName", "-f", type=str, required=True)
parser.add_argument("--region", "-r", type=str, required=True)
args = parser.parse_args()
inputJson = args.bucketName
inputBucket = args.bucketName
fileName = args.fileName
region = args.region
logMessage(fileName, 'received ' + inputBucket + " " + fileName + " " + region + " from params", LOGTYPE_INFO)
except Exception as ex:
logMessage(fileName, "Unexpected error:" + str(ex), LOGTYPE_ERROR)
logMessage(fileName, "Unexpected error during arg parsing (due to lack of environment variables):" + str(ex), LOGTYPE_ERROR)
process_files(inputBucket, fileName, region)
logMessage(fileName, 'parameters: ' + inputBucket + " " + fileName + " " + region, LOGTYPE_INFO)
endTime = datetime.now()
diffTime = endTime - startTime
process_file(inputBucket, fileName, region)
logMessage(fileName, "Total processing time - " + str(diffTime.seconds), LOGTYPE_INFO)
def logMessage(productId, message, logType):
def logMessage(fileName, message, logType):
try:
logMessageDetails = constructMessageFormat(productId, message, "", logType)
logMessageDetails = constructMessageFormat(fileName, message, "", logType)
if logType == "INFO" or logType == "ERROR":
print(logMessageDetails)
......@@ -174,7 +87,7 @@ def logMessage(productId, message, logType):
except KeyError:
pass
except Exception as ex:
logMessageDetails = constructMessageFormat(productId, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType)
print(logMessageDetails)
......@@ -184,12 +97,5 @@ def constructMessageFormat(productId, message, additionalErrorDetails, logType):
else:
return "ProductId: " + productId + " " + logType + ": " + message
def test():
print("just a test message")
# COMMENT below for local/development testing
# main()
print("Python file invoked")
if __name__ == '__main__':
main()
\ No newline at end of file
main()
......@@ -65,8 +65,6 @@ Resources:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
- arn:aws:iam::aws:policy/AmazonKinesisFullAccess
- arn:aws:iam::aws:policy/AWSBatchFullAccess
- arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
Path: /
......@@ -101,7 +99,6 @@ Resources:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess
BatchProcessingJobDefinition:
Type: AWS::Batch::JobDefinition
Properties:
......@@ -114,7 +111,7 @@ Resources:
- - Ref: AWS::AccountId
- .dkr.ecr.
- Ref: AWS::Region
- ".amazonaws.com/batch-processing-job-repository:latest"
- ".amazonaws.com/aws-batch-s3-unitigs-job:latest"
Vcpus: 2
Memory: 2000
Command:
......@@ -139,7 +136,7 @@ Resources:
Type: EC2
MinvCpus: 0
DesiredvCpus: 0
MaxvCpus: 32
MaxvCpus: 2
InstanceTypes:
#- a1.medium
- optimal
......@@ -156,11 +153,18 @@ Resources:
DependsOn: BatchProcessBucketPermission
Properties:
BucketName:
!Sub 'batch-processing-job-${AWS::AccountId}'
!Sub 'aws-unitigs-${AWS::AccountId}'
NotificationConfiguration:
LambdaConfigurations:
- Event: 's3:ObjectCreated:*'
Function: !GetAtt BatchProcessingLambdaInvokeFunction.Arn
Filter:
S3Key:
Rules:
- Name: suffix
Value: '.gz'
BatchProcessBucketPermission:
Type: AWS::Lambda::Permission
Properties:
......@@ -168,7 +172,7 @@ Resources:
FunctionName: !Ref BatchProcessingLambdaInvokeFunction
Principal: s3.amazonaws.com
SourceAccount: !Ref "AWS::AccountId"
SourceArn: !Sub "arn:aws:s3:::batch-processing-job-${AWS::AccountId}"
SourceArn: !Sub "arn:aws:s3:::aws-unitigs-${AWS::AccountId}"
BatchProcessingLambdaInvokeFunction:
Type: AWS::Lambda::Function
Properties:
......@@ -224,133 +228,6 @@ Resources:
print("Job ID is {}.".format(response['jobId']))
return response
#Code Commit
CodeCommitRepository:
Type: AWS::CodeCommit::Repository
Properties:
RepositoryName: batch-processing-job-repo
RepositoryDescription: Respository to maintain code related to the Batch Processing Jobs.
#Code Build
CodeBuildProject:
Type: AWS::CodeBuild::Project
Properties:
Name: batch-processing-job-build
Description: Todo Api application codebuild project.
ServiceRole: !GetAtt CodeBuildRole.Arn
Artifacts:
Type: no_artifacts
Environment:
Type: LINUX_CONTAINER
ComputeType: BUILD_GENERAL1_SMALL
Image: aws/codebuild/amazonlinux2-x86_64-standard:1.0
PrivilegedMode: true
EnvironmentVariables:
- Name: REPOSITORY_URI
Type: PLAINTEXT
Value:
Fn::Join:
- ''
- - Ref: AWS::AccountId
- .dkr.ecr.
- Ref: AWS::Region
- ".amazonaws.com/batch-processing-job-repository:latest"
- Name: AWS_DEFAULT_REGION
Type: PLAINTEXT
Value:
Ref: AWS::Region
Source:
BuildSpec: config/buildspec.yml
Location:
Fn::Join:
- ''
- - 'https://git-codecommit.'
- Ref: AWS::Region
- '.amazonaws.com/v1/repos/'
- batch-processing-job-repo
Type: CODECOMMIT
SourceVersion: refs/heads/master
TimeoutInMinutes: 10
CodeBuildRole:
Type: AWS::IAM::Role
Properties:
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryFullAccess
- arn:aws:iam::aws:policy/AWSCodeCommitFullAccess
AssumeRolePolicyDocument:
Statement:
- Action: ['sts:AssumeRole']
Effect: Allow
Principal:
Service: [codebuild.amazonaws.com]
Version: '2012-10-17'
Path: /
Policies:
- PolicyName: CodeBuildAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Action:
- 'logs:*'
- 'ec2:CreateNetworkInterface'
- 'ec2:DescribeNetworkInterfaces'
- 'ec2:DeleteNetworkInterface'
- 'ec2:DescribeSubnets'
- 'ec2:DescribeSecurityGroups'
- 'ec2:DescribeDhcpOptions'
- 'ec2:DescribeVpcs'
- 'ec2:CreateNetworkInterfacePermission'
Effect: Allow
Resource: '*'
# CloudWatchEvents Code build Rold
CloudWatchEventsCodeBuildRole:
Type: AWS::IAM::Role
Properties:
RoleName: batch-processing-job-cw-events-codebuild-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
-
Effect: Allow
Principal:
Service:
- events.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: aws-events-code-build
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'codebuild:StartBuild'
Resource: !GetAtt CodeBuildProject.Arn
# CloudWatch Event Rule for codecommit build trigger
CloudWatchEventCodeBuildEventRule:
Type: AWS::Events::Rule
Properties:
Description: "This event rule triggers the build on code commit event"
EventPattern:
source:
- "aws.codecommit"
detail-type:
- "CodeCommit Repository State Change"
detail:
event:
- "referenceCreated"
- "referenceUpdated"
referenceType:
- "branch"
referenceName:
- "master"
State: "ENABLED"
Targets:
-
Arn: {'Fn::GetAtt': [CodeBuildProject, Arn]}
Id: cloudwatch-codebuild-eventrules
RoleArn: !GetAtt CloudWatchEventsCodeBuildRole.Arn
BatchProcessRepository:
Type: AWS::ECR::Repository
Properties:
......@@ -372,47 +249,7 @@ Resources:
- "ecr:InitiateLayerUpload"
- "ecr:UploadLayerPart"
- "ecr:CompleteLayerUpload"
BatchProcessingDynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: batch-processing-job
AttributeDefinitions:
-
AttributeName: "ProductId"
AttributeType: "S"
-
AttributeName: "ProductName"
AttributeType: "S"
-
AttributeName: "ProductDescription"
AttributeType: "S"
AttributeName: "CreatedTime"
AttributeType: "S"
KeySchema:
-
AttributeName: "ProductId"
KeyType: "HASH"
-
AttributeName: "ProductName"
KeyType: "RANGE"
GlobalSecondaryIndexes:
-
IndexName: "GSI"
KeySchema:
-
AttributeName: "CreatedTime"
KeyType: "HASH"
Projection:
ProjectionType: "KEYS_ONLY"
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
Outputs:
ComputeEnvironmentArn:
Value:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment