python 步骤函数中的AWS批处理作业执行结果

mi7gmzs6  于 2022-11-28  发布在  Python
关注(0)|答案(3)|浏览(166)

我是AWS步骤函数和AWS批处理的新手。我正在尝试将AWS批处理作业与步骤函数集成。AWS批处理作业执行输出字符串值的简单python脚本(高级简化要求)。我需要将python脚本输出提供给step函数的下一个状态。我应该如何完成此操作。AWS批处理作业输出不包含python脚本的结果。而是包含具有输入值的所有容器相关信息。
示例:AWS批处理作业执行输出“Hello World”的python脚本。我需要“Hello World”可用于步骤函数的下一个状态,以执行与之关联的lambda。

zqdjd7g9

zqdjd7g91#

我能够做到这一点,下面是我的状态机,我采用了运行批处理作业Manage a Batch Job (AWS Batch, Amazon SNS)的示例项目,并修改了两个lambda以传递输入/输出。

{
  "Comment": "An example of the Amazon States Language for notification on an AWS Batch job completion",
  "StartAt": "Submit Batch Job",
  "TimeoutSeconds": 3600,
  "States": {
    "Submit Batch Job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::batch:submitJob.sync",
      "Parameters": {
        "JobName": "BatchJobNotification",
        "JobQueue": "arn:aws:batch:us-east-1:1234567890:job-queue/BatchJobQueue-737ed10e7ca3bfd",
        "JobDefinition": "arn:aws:batch:us-east-1:1234567890:job-definition/BatchJobDefinition-89c42b1f452ac67:1"
      },
      "Next": "Notify Success",
      "Catch": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "Next": "Notify Failure"
        }
      ]
    },
    "Notify Success": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:1234567890:function:readcloudwatchlogs",
      "Parameters": {
        "LogStreamName.$": "$.Container.LogStreamName"
      },
      "ResultPath": "$.lambdaOutput",
      "Next": "ConsumeLogs"
    },
    "ConsumeLogs": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:1234567890:function:consumelogs",
      "Parameters": {
        "randomstring.$": "$.lambdaOutput.logs"
      },
      "End": true
    },
    "Notify Failure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "Message": "Batch job submitted through Step Functions failed",
        "TopicArn": "arn:aws:sns:us-east-1:1234567890:StepFunctionsSample-BatchJobManagement17968f39-e227-47ab-9a75-08a7dcc10c4c-SNSTopic-1GR29R8TUHQY8"
      },
      "End": true
    }
  }
}

读取日志的关键是Submit Batch Job输出,其中包含LogStreamName,我将其传递给名为function:readcloudwatchlogs的lambda并读取日志,然后最终将读取的日志传递给下一个名为function:consumelogs的函数。您可以在所附的截图中看到consumelogs函数打印日志。

{
  "Attempts": [
    {
      "Container": {
        "ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243",
        "ExitCode": 0,
        "LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b",
        "NetworkInterfaces": [],
        "TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b"
      },
      "StartedAt": 1611329367577,
      "StatusReason": "Essential container in task exited",
      "StoppedAt": 1611329367748
    }
  ],
  "Container": {
    "Command": [
      "echo",
      "Hello world"
    ],
    "ContainerInstanceArn": "arn:aws:ecs:us-east-1:1234567890:container-instance/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/6d11fdbfc9eb4f40b0d6b85c396bb243",
    "Environment": [
      {
        "Name": "MANAGED_BY_AWS",
        "Value": "STARTED_BY_STEP_FUNCTIONS"
      }
    ],
    "ExitCode": 0,
    "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest",
    "LogStreamName": "BatchJobDefinition-89c42b1f452ac67/default/2ad955bf59a8418893f53182f0d87b4b",
    "TaskArn": "arn:aws:ecs:us-east-1:1234567890:task/BatchComputeEnvironment-4a1593ce223b3cf_Batch_7557555f-5606-31a9-86b9-83321eb3e413/2ad955bf59a8418893f53182f0d87b4b",
..
  },
..
  "Tags": {
    "resourceArn": "arn:aws:batch:us-east-1:1234567890:job/d36ba07a-54f9-4acf-a4b8-3e5413ea5ffc"
  }
}
  • 读取日志Lambda代码:
import boto3

client = boto3.client('logs')

def lambda_handler(event, context):
    print(event)
    response = client.get_log_events(
        logGroupName='/aws/batch/job',
        logStreamName=event.get('LogStreamName')
    )
    log = {'logs': response['events'][0]['message']}
    return log
  • 消耗日志Lambda代码
import json

print('Loading function')

def lambda_handler(event, context):
    print(event)

pengsaosao

pengsaosao2#

您可以将您的步骤函数执行ID($$.Execution.ID)传递给批处理进程,然后您的批处理进程可以使用执行ID和主键(或其他字段)将其响应写入DynamoDB。然后,您需要一个后续步骤从DynamoDB读取directly并捕获进程响应。
我一直在寻找一种方法来做到这一点,没有后续步骤,但到目前为止没有骰子。

pbgvytdp

pbgvytdp3#

虽然不能使用submitJob执行waitForTaskToken,但仍然可以使用回调模式,方法是在Parameters中传递任务标记,并在command覆盖中使用Ref::TaskToken引用它:

...
   "Submit Batch Job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::batch:submitJob.sync",
      "Parameters": {
         "TaskToken.$": "$$.Task.Token"
      },
      "ContainerOverrides": {
         "command": ["python3", 
                     "my_script.py", 
                     "Ref::TaskToken"]
      }
...

然后,当脚本完成处理时,只需调用StepFunctions.SendTaskSuccessStepFunctions.SendTaskFailure

import boto3

client = boto3.client('stepfunctions')

def main()
    args = sys.argv[1:]
    client.send_task_success(taskToken=args[0], output='Hello World')

相关问题