Sagemaker流水线中的SparkJarProcessor

9ceoxa92  于 2023-01-17  发布在  Apache
关注(0)|答案(2)|浏览(172)

我想在Sagemaker管道中运行SparkJarProcessor。创建SparkJarProcessor的示例后,当我只对处理器执行run时,我可以使用run方法的submit_appsubmit_class参数指定要执行的jar和类。例如,

processor.run(
    submit_app="my.jar",
    submit_class="program.to.run",
    arguments=['--my_arg', "my_arg"],
    configuration=my_config,
    spark_event_logs_s3_uri=log_path
)

如果我想把它作为流水线中的一个步骤来运行,我可以给予ProcessingStep什么参数?根据这篇文档,你可以在处理器上调用get_run_args来“* 获取在一个ProcessingStep中使用一个SparkJarProcessor时所需要的规范化输入、输出和参数 *",但是当我这样运行它时,

processor.get_run_args(
    submit_app="my.jar", 
    submit_class="program.to.run",
    arguments=['--my_arg', "my_arg"],
    configuration=my_config,
    spark_event_logs_s3_uri=log_path
)

我的输出如下所示:

RunArgs(code='my.jar', inputs=[<sagemaker.processing.ProcessingInput object at 0x7fc53284a090>], outputs=[<sagemaker.processing.ProcessingOutput object at 0x7fc532845ed0>], arguments=['--my_arg', 'my_arg'])

“program.to.run“不是输出的一部分,那么,假设code指定jar,那么submit_class的规范化版本是什么?

eyh26e7m

eyh26e7m1#

当在SparkJarProcessor上调用get_run_argsrun时,submit_class用于设置处理器本身的属性,这就是您在get_run_args输出中看不到它的原因。
该处理器属性将在管道定义生成期间用于将ContainerEntrypoint参数设置为CreateProcessingJob
示例:

run_args = spark_processor.get_run_args(
    submit_app="my.jar",
    submit_class="program.to.run",
    arguments=[]
)

step_process = ProcessingStep(
    name="SparkJarProcessStep",
    processor=spark_processor,
    inputs=run_args.inputs,
    outputs=run_args.outputs,
    code=run_args.code
)

pipeline = Pipeline(
    name="myPipeline",
    parameters=[],
    steps=[step_process],
)

definition = json.loads(pipeline.definition())
definition

definition的输出:

...
'Steps': [{'Name': 'SparkJarProcessStep',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': 2,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:2.4-cpu',
     'ContainerEntrypoint': ['smspark-submit',
      '--class',
      'program.to.run',
      '--local-spark-event-logs-dir',
      '/opt/ml/processing/spark-events/',
      '/opt/ml/processing/input/code/my.jar']},
...
7rfyedvj

7rfyedvj2#

对于更现代的sagemaker sdk版本,您可以直接使用run方法。例如,对于'2.120.0' sagemaker sdk版本:

from sagemaker.workflow.steps import ProcessingStep
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.workflow.pipeline_context import PipelineSession

session = PipelineSession()

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    sagemaker_session=session,
)

step_preprocess_data = ProcessingStep(
    name="spark-train-data",
    step_args=spark_processor.run(
        submit_app="./code/preprocess.py",
        arguments=[
            "--s3_input_bucket",
            bucket,
            "--s3_input_key_prefix",
            "user_filestore/marti/test-spark",
            "--s3_output_bucket",
            bucket,
            "--s3_output_key_prefix",
            "user_filestore/marti/test-spark",
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, "user_filestore/marti/test-spark"),
    )
)

相关问题