我是AWS步骤函数和AWS批处理的新手。我正在尝试将AWS批处理作业与步骤函数集成。AWS批处理作业执行输出字符串值的简单python脚本(高级简化要求)。我需要将python脚本输出提供给step函数的下一个状态。我应该如何完成此操作。AWS批处理作业输出不包含python脚本的结果。而是包含具有输入值的所有容器相关信息。示例:AWS批处理作业执行输出“Hello World”的python脚本。我需要“Hello World”可用于步骤函数的下一个状态,以执行与之关联的lambda。
zqdjd7g91#
我能够做到这一点,下面是我的状态机,我采用了运行批处理作业Manage a Batch Job (AWS Batch, Amazon SNS)的示例项目,并修改了两个lambda以传递输入/输出。
{ "Comment": "An example of the Amazon States Language for notification on an AWS Batch job completion", "StartAt": "Submit Batch Job", "TimeoutSeconds": 3600, "States": { "Submit Batch Job": { "Type": "Task", "Resource": "arn:aws:states:::batch:submitJob.sync", "Parameters": { "JobName": "BatchJobNotification", "JobQueue": "arn:aws:batch:us-east-1:1234567890:job-queue/BatchJobQueue-737ed10e7ca3bfd", "JobDefinition": "arn:aws:batch:us-east-1:1234567890:job-definition/BatchJobDefinition-89c42b1f452ac67:1" }, "Next": "Notify Success", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Notify Failure" } ] }, "Notify Success": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:1234567890:function:readcloudwatchlogs", "Parameters": { "LogStreamName.$": "$.Container.LogStreamName" }, "ResultPath": "$.lambdaOutput", "Next": "ConsumeLogs" }, "ConsumeLogs": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:1234567890:function:consumelogs", "Parameters": { "randomstring.$": "$.lambdaOutput.logs" }, "End": true }, "Notify Failure": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "Message": "Batch job submitted through Step Functions failed", "TopicArn": "arn:aws:sns:us-east-1:1234567890:StepFunctionsSample-BatchJobManagement17968f39-e227-47ab-9a75-08a7dcc10c4c-SNSTopic-1GR29R8TUHQY8" }, "End": true } } }
读取日志的关键是Submit Batch Job输出,其中包含LogStreamName,我将其传递给名为function:readcloudwatchlogs的lambda并读取日志,然后最终将读取的日志传递给下一个名为function:consumelogs的函数。您可以在所附的截图中看到consumelogs函数打印日志。
Submit Batch Job
LogStreamName
function:readcloudwatchlogs
function:consumelogs
consumelogs
{ "Attempts": [ { "Container": { "ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243", "ExitCode": 0, "LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b", "NetworkInterfaces": [], "TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b" }, "StartedAt": 1611329367577, "StatusReason": "Essential container in task exited", "StoppedAt": 1611329367748 } ], "Container": { "Command": [ "echo", "Hello world" ], "ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243", "Environment": [ { "Name": "MANAGED_BY_AWS", "Value": "STARTED_BY_STEP_FUNCTIONS" } ], "ExitCode": 0, "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest", "LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b", "TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b", .. }, .. "Tags": { "resourceArn": "arn:aws:batch:us-east-1:1234567890:job/d36ba07a-54f9-4acf-a4b8-3e5413ea5ffc" } }
import boto3 client = boto3.client('logs') def lambda_handler(event, context): print(event) response = client.get_log_events( logGroupName='/aws/batch/job', logStreamName=event.get('LogStreamName') ) log = {'logs': response['events'][0]['message']} return log
import json print('Loading function') def lambda_handler(event, context): print(event)
pengsaosao2#
您可以将您的步骤函数执行ID($$.Execution.ID)传递给批处理进程,然后您的批处理进程可以使用执行ID和主键(或其他字段)将其响应写入DynamoDB。然后,您需要一个后续步骤从DynamoDB读取directly并捕获进程响应。我一直在寻找一种方法来做到这一点,没有后续步骤,但到目前为止没有骰子。
pbgvytdp3#
虽然不能使用submitJob执行waitForTaskToken,但仍然可以使用回调模式,方法是在Parameters中传递任务标记,并在command覆盖中使用Ref::TaskToken引用它:
submitJob
waitForTaskToken
command
Ref::TaskToken
... "Submit Batch Job": { "Type": "Task", "Resource": "arn:aws:states:::batch:submitJob.sync", "Parameters": { "TaskToken.$": "$$.Task.Token" }, "ContainerOverrides": { "command": ["python3", "my_script.py", "Ref::TaskToken"] } ...
然后,当脚本完成处理时,只需调用StepFunctions.SendTaskSuccess或StepFunctions.SendTaskFailure:
StepFunctions.SendTaskSuccess
StepFunctions.SendTaskFailure
import boto3 client = boto3.client('stepfunctions') def main() args = sys.argv[1:] client.send_task_success(taskToken=args[0], output='Hello World')
3条答案
按热度按时间zqdjd7g91#
我能够做到这一点,下面是我的状态机,我采用了运行批处理作业Manage a Batch Job (AWS Batch, Amazon SNS)的示例项目,并修改了两个lambda以传递输入/输出。
读取日志的关键是
Submit Batch Job
输出,其中包含LogStreamName
,我将其传递给名为function:readcloudwatchlogs
的lambda并读取日志,然后最终将读取的日志传递给下一个名为function:consumelogs
的函数。您可以在所附的截图中看到consumelogs
函数打印日志。pengsaosao2#
您可以将您的步骤函数执行ID($$.Execution.ID)传递给批处理进程,然后您的批处理进程可以使用执行ID和主键(或其他字段)将其响应写入DynamoDB。然后,您需要一个后续步骤从DynamoDB读取directly并捕获进程响应。
我一直在寻找一种方法来做到这一点,没有后续步骤,但到目前为止没有骰子。
pbgvytdp3#
虽然不能使用
submitJob
执行waitForTaskToken
,但仍然可以使用回调模式,方法是在Parameters中传递任务标记,并在command
覆盖中使用Ref::TaskToken
引用它:然后,当脚本完成处理时,只需调用
StepFunctions.SendTaskSuccess
或StepFunctions.SendTaskFailure
: