diff --git a/LICENSE b/LICENSE index fcc7fa6828c40b2189052d7576f35ec52462fbc6..4a0e5b790a64b7c02505e683e1f1a03f29a886bf 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +Copyright 2020 Rayan Chikhi Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/Orchestrating an application process with AWS Batch.png b/Orchestrating an application process with AWS Batch.png deleted file mode 100644 index 743b2d7c4c2aa945c23fd4ab2bf9a3026929a053..0000000000000000000000000000000000000000 Binary files a/Orchestrating an application process with AWS Batch.png and /dev/null differ diff --git a/README.md b/README.md index f482c9b1e7e61399ce4d9d6a0675bf927090401f..660321d1e7b78bcb8ea29c4e48793042f4094f61 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,9 @@ -# Orchestrating an Application Process with AWS Batch using AWS CloudFormation +# Taken from: "Orchestrating an Application Process with AWS Batch using AWS CloudFormation" -The sample provided spins up an application orchestration using AWS Services like AWS Simple Storage Service (S3), AWS Lambda and AWS DynamoDB. Amazon Elastic Container Registry (ECR) is used as the Docker container registry. Once the CloudFormation stack is spun up, the downloaded code can be checked in into your AWS CodeCommit repository (built as part of the stack) which would trigger the build to deploy the image to Amazon ECR. AWS Batch will be triggered by the lambda when a sample CSV file is dropped into the S3 bucket. +But I removed all the CodeCommit stuff (replaced by `deploy-docker.sh` manual deployment to ECR). -**As part of this blog we will do the following.** - -1. Run the CloudFormation template (command provided) to create the necessary infrastructure - -2. Set up the Docker image for the job - - Build a Docker image - - Tag the build and push the image to the repository - -3. Drop the CSV into the S3 bucket (Copy paste the contents and create them as a sample file (“Sample.csvâ€) - -4. Notice the Job runs and performs the operation based on the pushed container image. The job parses the CSV file and adds each row into DynamoDB. - - +Amazon Elastic Container Registry (ECR) is used as the Docker container registry. AWS Batch will be triggered by the lambda when a dataset file is dropped into the S3 bucket. ### Design Considerations @@ -27,70 +15,32 @@ The sample provided spins up an application orchestration using AWS Services lik ### Steps -1. Download this repository - We will refer this as SOURCE_REPOSITORY +1. Execute the below commands to spin up the infrastructure cloudformation stack. This stack spins up all the necessary AWS infrastructure needed for this exercise ``` - $ git clone https://github.com/aws-samples/aws-batch-processing-job-repo +./deploy-docker.sh [need to modify Amazon ID in it] +./spinup.sh ``` -2. Execute the below commands to spin up the infrastructure cloudformation stack. This stack spins up all the necessary AWS infrastructure needed for this exercise - -``` -$ cd aws-batch-processing-job-repo - -$ aws cloudformation create-stack --stack-name batch-processing-job --template-body file://template/template.yaml --capabilities CAPABILITY_NAMED_IAM -``` - -3. You can run the application in two different ways - - * #### CI/CD implementation. - - ##### This steps allows you to copy the contents from source git repo and trigger deployment into your repository - * The above command would have created a git repository in your personal account. Make sure to replace your region below accordingly - * $ git clone https://git-codecommit.us-east-1.amazonaws.com/v1/repos/batch-processing-job-repo - * cd batch-processing-job-repo - * copy all the contents from SOURCE_REPOSITORY (from step 1) and paste inside this folder - * $ git add . - * $ git commit -m "commit from source" - * $ git push - - * #### Build and run from local desktop. - - ##### Containarize the provided python file and push it to the Amazon ECR. Dockerfile provided as part of this exercise. In this steps you push the code (provided as part of this exercise) to the repository that was built as part of CloudFormation stack - - * RUN the below commands to dockerize the python file - - i. Make sure to replace your account number, region accordingly - - ii. Make sure to have Docker daemon running in your local computer - - - ``` - $ cd SOURCE_REPOSITORY (Refer step 1) - $ cd src - - # get the login creds and copy the below output and paste/run on the command line - $ $ aws ecr get-login --region us-east-1 --no-include-email - - # Build the docker image locally, tag and push it to the repository - $ $ docker build -t batch_processor . - $ docker tag batch_processor <YOUR_ACCOUNT_NUMBER>.dkr.ecr.us-east-1.amazonaws.com/batch-processing-job-repository - $ docker push <YOUR_ACCOUNT_NUMBER>.dkr.ecr.us-east-1.amazonaws.com/batch-processing-job-repository - - ``` - ### Testing Make sure to complete the above step. You can review the image in AWS Console > ECR - "batch-processing-job-repository" repository -1. AWS S3 bucket - batch-processing-job-<YOUR_ACCOUNT_NUMBER> is created as part of the stack. -2. Drop the provided Sample.CSV into the S3 bucket. This will trigger the Lambda to trigger the AWS Batch +1. AWS S3 bucket - aws-unitigs-<YOUR_ACCOUNT_NUMBER> is created as part of the stack. +2. Drop a dataset in it This will trigger the Lambda to trigger the AWS Batch 3. In AWS Console > Batch, Notice the Job runs and performs the operation based on the pushed container image. The job parses the CSV file and adds each row into DynamoDB. -4. In AWS Console > DynamoDB, look for "batch-processing-job" table. Note sample products provided as part of the CSV is added by the batch ### Code Cleanup -1. AWS Console > S3 bucket - batch-processing-job-<YOUR_ACCOUNT_NUMBER> - Delete the contents of the file +In short: + +``` +./cleanup.sh +``` + +Which does (except it doesn't do 2. for ECR): + +1. AWS Console > S3 bucket - aws-unitigs-<YOUR_ACCOUNT_NUMBER> - Delete the contents of the file 2. AWS Console > ECR - batch-processing-job-repository - delete the image(s) that are pushed to the repository 3. run the below command to delete the stack. diff --git a/config/buildspec.yml b/config/buildspec.yml deleted file mode 100644 index 0ac59a30177dc6d34eb742fc28a9eb67b2c5ef62..0000000000000000000000000000000000000000 --- a/config/buildspec.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: 0.2 - -phases: - install: - runtime-versions: - python: 3.7 - pre_build: - commands: - - aws --version - # Cloudformation will set this application environment variables for CodeBuild - # REPOSITORY_URI=<youraccountnumber>.dkr.ecr.<region>.amazonaws.com/todo-repository - # AWS_DEFAULT_REGION=region ex: us-east-1 - - echo 'region - ' - $AWS_DEFAULT_REGION - - echo 'repository - ' $REPOSITORY_URI - - cd src/ - - echo Logging in to Amazon ECR - - $(aws ecr get-login --region $AWS_DEFAULT_REGION --no-include-email) - build: - commands: - - echo Build started on `date` - - echo Building the Docker image... - - docker build -t $REPOSITORY_URI . - - docker tag $REPOSITORY_URI $REPOSITORY_URI - post_build: - commands: - - echo Build completed on `date` - - echo Push the latest Docker Image... - - docker push $REPOSITORY_URI \ No newline at end of file diff --git a/sample/sample.csv b/sample/sample.csv deleted file mode 100644 index bb3df5c9ebcf578271905321d41eb5ba6db9a036..0000000000000000000000000000000000000000 --- a/sample/sample.csv +++ /dev/null @@ -1,7 +0,0 @@ -ProductId,ProductName,ProductDescription -P00001,Phone,Traditional Voice Phone -P00002,Internet,Traditional Copper Internet -P00003,Television,Free To Air -P00004,Phone,Digital Voice -P00005,Internet,Fiber Internet -P00006,Television,Streaming TV Channel \ No newline at end of file diff --git a/src/Dockerfile b/src/Dockerfile index 36b12c788be42ba961035aff2277e01a0b13b83c..c7baded935705e19feadbd2e3839d02b7e7298f1 100644 --- a/src/Dockerfile +++ b/src/Dockerfile @@ -1,4 +1,5 @@ FROM python +# https://pythonspeed.com/articles/alpine-docker-python/ COPY batch_processor.py / @@ -10,4 +11,4 @@ RUN pip install --upgrade pip && \ RUN pwd RUN ls -CMD ["python", "batch_processor.py"] \ No newline at end of file +CMD ["python", "batch_processor.py"] diff --git a/src/batch_processor.py b/src/batch_processor.py index f91eff170237abf4d27a13e91f7e525811d4deaf..0fd76f61ce460105851286ba405a1abe2db2d050 100644 --- a/src/batch_processor.py +++ b/src/batch_processor.py @@ -16,154 +16,67 @@ LOGTYPE_ERROR = 'ERROR' LOGTYPE_INFO = 'INFO' LOGTYPE_DEBUG = 'DEBUG' -# table - batch_processing_job -def batch_processing_job_update(productId, productName, productDescription, fileName, region): - try: - now = datetime.now() - date_time = now.strftime("%m-%d-%Y %H:%M:%S.%f")[:-3] - - dynamodb = boto3.resource('dynamodb', region_name = region) - table = dynamodb.Table(DB_TABLE) - - table.put_item( - Item={ - 'ProductId': productId, - 'Filename': fileName, - 'CreatedTime': date_time, - 'LastModified': date_time, - 'ProductName': productName, - 'ProductDescription': productDescription - } - ) - - except Exception as ex: - logMessage(fileName, "Error Updating DynamoDB:" + str(ex), LOGTYPE_ERROR) - -def read_file(fileName, inputBucket, inputFile, s3): - input_products = [] - logMessage(fileName, 'Reading file - ' + inputBucket + "/" + inputFile, LOGTYPE_INFO) - productId = "" - productName = "" - productDescription = "" - try: - s3_object = s3.Object(inputBucket, inputFile).get()[u'Body'].read().decode('utf-8') - input_lines = s3_object.splitlines() - - productIndex = 0 - for row in csv.DictReader(input_lines, delimiter='\t'): - try: - print('row - ' + str(row)) - eachRow = row['ProductId,ProductName,ProductDescription'].split(',') - productId = eachRow[0] - productName = eachRow[1] - productDescription = eachRow[2] - - except Exception as ex: - logMessage(fileName, "Error retrieving Product " + str(ex), LOGTYPE_DEBUG) - - productIndex = productIndex + 1 - input_products.append({'productIndex': productIndex, 'productId': productId, 'productName': productName, 'productDescription': productDescription}) - input_products = sorted(input_products, key=itemgetter('productIndex')) - - except Exception as ex: - logMessage(fileName, "Error parsing excel " + str(ex), LOGTYPE_ERROR) - - return input_products - -def batch_process(input_products, fileName, region): - - try: - for source_products in input_products: - productIndex = source_products['productIndex'] - productId = str(source_products['productId']) - productName = str(source_products['productName']) - productDescription = str(source_products['productDescription']) - - print(str(productIndex) +" " + productId + " " + productName + " " + productDescription + " " + fileName) - batch_processing_job_update(productId, productName, productDescription, fileName, region) - logMessage(fileName, "Product updated for " + productId + " and " + productName + " with "+ productDescription, LOGTYPE_INFO) - - except Exception as ex: - logMessage(fileName, "Error batch processing files" + str(ex), LOGTYPE_ERROR) - - -def process_files(inputBucket, fileName, region): +def process_file(inputBucket, fileName, region): try: urllib3.disable_warnings() - s3 = boto3.resource('s3', verify=False) + s3 = boto3.client('s3') prefix = fileName print("region - " + region) - bucket = s3.Bucket(name=inputBucket) - FilesNotFound = True startTime = datetime.now() + # download reads from s3 + local_file = "/tmp/" + fileName + s3.download_file(inputBucket, fileName, local_file) + print("downloaded file to",local_file) - for files in bucket.objects.filter(Prefix=prefix): - logMessage(fileName, 'files.key-' + files.key, LOGTYPE_DEBUG) - isCSVFile = files.key.endswith(".csv") - - if isCSVFile: - FilesNotFound = False - input_products = read_file(fileName, inputBucket, files.key, s3) - - if len(input_products) > 0: - batch_process(input_products, fileName, region) - else: - logMessage(fileName, "No products could be found in bucket {}/{}".format(inputBucket, prefix), LOGTYPE_INFO) + # upload unitigs to s3 + newFileName = fileName+".processed" + s3.upload_file(local_file, inputBucket, newFileName) - if FilesNotFound: - logMessage(fileName, "No file in {0}/{1}".format(bucket, prefix), LOGTYPE_INFO) - endTime = datetime.now() diffTime = endTime - startTime - logMessage(fileName, "Total processing time - " + str(diffTime.seconds), LOGTYPE_INFO) + logMessage(fileName, "File processing time - " + str(diffTime.seconds), LOGTYPE_INFO) except Exception as ex: logMessage(fileName, "Error processing files:" + str(ex), LOGTYPE_ERROR) def main(): - startTime = datetime.now() inputBucket = "" fileName = "" - region = "us-west-2" - - try: - inputBucket = os.environ.get("InputBucket") + region = "us-east-1" + + #try: + if "InputBucket" in os.environ: + inputBucket = os.environ.get("InputBucket") + if "FileName" in os.environ: fileName = os.environ.get("FileName") + if "Region" in os.environ: region = os.environ.get("Region") - - logMessage(fileName, 'received ' + inputBucket + " " + fileName + " from environment", LOGTYPE_INFO) - except: - error = '' try: - if inputBucket == "" and fileName == "": + if inputBucket == "" or fileName == "": parser = argparse.ArgumentParser() - parser.add_argument("--bucketName", "-js", type=str, required=True) - parser.add_argument("--fileName", "-js", type=str, required=True) - parser.add_argument("--region", "-js", type=str, required=True) + parser.add_argument("--bucketName", "-b", type=str, required=True) + parser.add_argument("--fileName", "-f", type=str, required=True) + parser.add_argument("--region", "-r", type=str, required=True) args = parser.parse_args() - inputJson = args.bucketName + inputBucket = args.bucketName fileName = args.fileName region = args.region - - logMessage(fileName, 'received ' + inputBucket + " " + fileName + " " + region + " from params", LOGTYPE_INFO) except Exception as ex: - logMessage(fileName, "Unexpected error:" + str(ex), LOGTYPE_ERROR) + logMessage(fileName, "Unexpected error during arg parsing (due to lack of environment variables):" + str(ex), LOGTYPE_ERROR) - process_files(inputBucket, fileName, region) + logMessage(fileName, 'parameters: ' + inputBucket + " " + fileName + " " + region, LOGTYPE_INFO) - endTime = datetime.now() - diffTime = endTime - startTime + process_file(inputBucket, fileName, region) - logMessage(fileName, "Total processing time - " + str(diffTime.seconds), LOGTYPE_INFO) -def logMessage(productId, message, logType): +def logMessage(fileName, message, logType): try: - logMessageDetails = constructMessageFormat(productId, message, "", logType) + logMessageDetails = constructMessageFormat(fileName, message, "", logType) if logType == "INFO" or logType == "ERROR": print(logMessageDetails) @@ -174,7 +87,7 @@ def logMessage(productId, message, logType): except KeyError: pass except Exception as ex: - logMessageDetails = constructMessageFormat(productId, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType) + logMessageDetails = constructMessageFormat(fileName, message, "Error occurred at Batch_processor.logMessage" + str(ex), logType) print(logMessageDetails) @@ -184,12 +97,5 @@ def constructMessageFormat(productId, message, additionalErrorDetails, logType): else: return "ProductId: " + productId + " " + logType + ": " + message -def test(): - print("just a test message") - -# COMMENT below for local/development testing -# main() - -print("Python file invoked") if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/template/template.yaml b/template/template.yaml index 8e3b15598b26f4c15d31e700269038bc7a6fab2c..03e51c30dd56b1a88911a5af2d93e46adda18c87 100644 --- a/template/template.yaml +++ b/template/template.yaml @@ -65,8 +65,6 @@ Resources: ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSLambdaExecute - arn:aws:iam::aws:policy/AmazonS3FullAccess - - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess - - arn:aws:iam::aws:policy/AmazonKinesisFullAccess - arn:aws:iam::aws:policy/AWSBatchFullAccess - arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole Path: / @@ -101,7 +99,6 @@ Resources: ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role - arn:aws:iam::aws:policy/AmazonS3FullAccess - - arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess BatchProcessingJobDefinition: Type: AWS::Batch::JobDefinition Properties: @@ -114,7 +111,7 @@ Resources: - - Ref: AWS::AccountId - .dkr.ecr. - Ref: AWS::Region - - ".amazonaws.com/batch-processing-job-repository:latest" + - ".amazonaws.com/aws-batch-s3-unitigs-job:latest" Vcpus: 2 Memory: 2000 Command: @@ -139,7 +136,7 @@ Resources: Type: EC2 MinvCpus: 0 DesiredvCpus: 0 - MaxvCpus: 32 + MaxvCpus: 2 InstanceTypes: #- a1.medium - optimal @@ -156,11 +153,18 @@ Resources: DependsOn: BatchProcessBucketPermission Properties: BucketName: - !Sub 'batch-processing-job-${AWS::AccountId}' + !Sub 'aws-unitigs-${AWS::AccountId}' NotificationConfiguration: LambdaConfigurations: - Event: 's3:ObjectCreated:*' Function: !GetAtt BatchProcessingLambdaInvokeFunction.Arn + Filter: + S3Key: + Rules: + - Name: suffix + Value: '.gz' + + BatchProcessBucketPermission: Type: AWS::Lambda::Permission Properties: @@ -168,7 +172,7 @@ Resources: FunctionName: !Ref BatchProcessingLambdaInvokeFunction Principal: s3.amazonaws.com SourceAccount: !Ref "AWS::AccountId" - SourceArn: !Sub "arn:aws:s3:::batch-processing-job-${AWS::AccountId}" + SourceArn: !Sub "arn:aws:s3:::aws-unitigs-${AWS::AccountId}" BatchProcessingLambdaInvokeFunction: Type: AWS::Lambda::Function Properties: @@ -224,133 +228,6 @@ Resources: print("Job ID is {}.".format(response['jobId'])) return response - #Code Commit - CodeCommitRepository: - Type: AWS::CodeCommit::Repository - Properties: - RepositoryName: batch-processing-job-repo - RepositoryDescription: Respository to maintain code related to the Batch Processing Jobs. - #Code Build - CodeBuildProject: - Type: AWS::CodeBuild::Project - Properties: - Name: batch-processing-job-build - Description: Todo Api application codebuild project. - ServiceRole: !GetAtt CodeBuildRole.Arn - Artifacts: - Type: no_artifacts - Environment: - Type: LINUX_CONTAINER - ComputeType: BUILD_GENERAL1_SMALL - Image: aws/codebuild/amazonlinux2-x86_64-standard:1.0 - PrivilegedMode: true - EnvironmentVariables: - - Name: REPOSITORY_URI - Type: PLAINTEXT - Value: - Fn::Join: - - '' - - - Ref: AWS::AccountId - - .dkr.ecr. - - Ref: AWS::Region - - ".amazonaws.com/batch-processing-job-repository:latest" - - - Name: AWS_DEFAULT_REGION - Type: PLAINTEXT - Value: - Ref: AWS::Region - Source: - BuildSpec: config/buildspec.yml - Location: - Fn::Join: - - '' - - - 'https://git-codecommit.' - - Ref: AWS::Region - - '.amazonaws.com/v1/repos/' - - batch-processing-job-repo - Type: CODECOMMIT - SourceVersion: refs/heads/master - TimeoutInMinutes: 10 - CodeBuildRole: - Type: AWS::IAM::Role - Properties: - ManagedPolicyArns: - - arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryFullAccess - - arn:aws:iam::aws:policy/AWSCodeCommitFullAccess - AssumeRolePolicyDocument: - Statement: - - Action: ['sts:AssumeRole'] - Effect: Allow - Principal: - Service: [codebuild.amazonaws.com] - Version: '2012-10-17' - Path: / - Policies: - - PolicyName: CodeBuildAccess - PolicyDocument: - Version: '2012-10-17' - Statement: - - Action: - - 'logs:*' - - 'ec2:CreateNetworkInterface' - - 'ec2:DescribeNetworkInterfaces' - - 'ec2:DeleteNetworkInterface' - - 'ec2:DescribeSubnets' - - 'ec2:DescribeSecurityGroups' - - 'ec2:DescribeDhcpOptions' - - 'ec2:DescribeVpcs' - - 'ec2:CreateNetworkInterfacePermission' - Effect: Allow - Resource: '*' - - # CloudWatchEvents Code build Rold - CloudWatchEventsCodeBuildRole: - Type: AWS::IAM::Role - Properties: - RoleName: batch-processing-job-cw-events-codebuild-role - AssumeRolePolicyDocument: - Version: 2012-10-17 - Statement: - - - Effect: Allow - Principal: - Service: - - events.amazonaws.com - Action: sts:AssumeRole - Policies: - - PolicyName: aws-events-code-build - PolicyDocument: - Version: 2012-10-17 - Statement: - - Effect: Allow - Action: - - 'codebuild:StartBuild' - Resource: !GetAtt CodeBuildProject.Arn - # CloudWatch Event Rule for codecommit build trigger - CloudWatchEventCodeBuildEventRule: - Type: AWS::Events::Rule - Properties: - Description: "This event rule triggers the build on code commit event" - EventPattern: - source: - - "aws.codecommit" - detail-type: - - "CodeCommit Repository State Change" - detail: - event: - - "referenceCreated" - - "referenceUpdated" - referenceType: - - "branch" - referenceName: - - "master" - State: "ENABLED" - Targets: - - - Arn: {'Fn::GetAtt': [CodeBuildProject, Arn]} - Id: cloudwatch-codebuild-eventrules - RoleArn: !GetAtt CloudWatchEventsCodeBuildRole.Arn - BatchProcessRepository: Type: AWS::ECR::Repository Properties: @@ -372,47 +249,7 @@ Resources: - "ecr:InitiateLayerUpload" - "ecr:UploadLayerPart" - "ecr:CompleteLayerUpload" - - BatchProcessingDynamoDBTable: - Type: AWS::DynamoDB::Table - Properties: - TableName: batch-processing-job - AttributeDefinitions: - - - AttributeName: "ProductId" - AttributeType: "S" - - - AttributeName: "ProductName" - AttributeType: "S" - - - AttributeName: "ProductDescription" - AttributeType: "S" - AttributeName: "CreatedTime" - AttributeType: "S" - KeySchema: - - - AttributeName: "ProductId" - KeyType: "HASH" - - - AttributeName: "ProductName" - KeyType: "RANGE" - GlobalSecondaryIndexes: - - - IndexName: "GSI" - KeySchema: - - - AttributeName: "CreatedTime" - KeyType: "HASH" - Projection: - ProjectionType: "KEYS_ONLY" - ProvisionedThroughput: - ReadCapacityUnits: 5 - WriteCapacityUnits: 5 - ProvisionedThroughput: - ReadCapacityUnits: 5 - WriteCapacityUnits: 5 - Outputs: ComputeEnvironmentArn: Value: