java 在aws emr作业流程中,是否每一步都收到前一步的输出?

8gsdolmq  于 2023-01-11  发布在  Java
关注(0)|答案(1)|浏览(153)

我正在用Java制作一个Map Reduce程序,它有4个步骤。每个步骤都是对前一步的输出进行操作。
到目前为止,我在本地和手动运行了那些步骤,我想开始使用作业流在AWS EMR上运行。
我的教授给了我们一些配置作业流步骤的代码,但现在我们面临一个问题:
我的每个步骤都期望输入和输出路径作为其主函数的参数。JobFlow允许我将参数传输到每个步骤,但据我所知,作业流中的每个步骤都应该自动接收前一步骤的输出
有人知道这是真的吗?步骤中的map-reduce应用程序如何认识到它的输入在哪里?路径是否作为JobFlow的参数隐式传递给它?
我使用的是AWS SDK 2 for Java。
我的代码:

public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
                // AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider
                // .create(ProfileCredentialsProvider.create().resolveCredentials());

                EmrClient mapReduce = EmrClient.builder().credentialsProvider(ProfileCredentialsProvider.create())
                                .build();
                List<StepConfig> steps = new LinkedList<StepConfig>();

                HadoopJarStepConfig hadoopJarStepConfig = HadoopJarStepConfig.builder()
                                .jar("s3n://" + myBucketName + "/" + NCount + jarPostfix)
                                .mainClass(packageName + NCount)
                                .args(??????????????????????)
                                .build();
                steps.add(StepConfig.builder().name(NCount).hadoopJarStep(hadoopJarStepConfig)
                                .actionOnFailure("TERMINATE_JOB_FLOW").build());

                HadoopJarStepConfig hadoopJarStepConfig2 = HadoopJarStepConfig.builder()
                                .jar("s3n://" + myBucketName + "/" + CountNrTr + jarPostfix)
                                .mainClass(packageName + CountNrTr)
                                .args(??????????????????????)
                                .build();
                steps.add(StepConfig.builder().name(CountNrTr).hadoopJarStep(hadoopJarStepConfig2)
                                .actionOnFailure("TERMINATE_JOB_FLOW").build());

                HadoopJarStepConfig hadoopJarStepConfig3 = HadoopJarStepConfig.builder()
                                .jar("s3n://" + myBucketName + "/" + JoinAndCalculate + jarPostfix)
                                .mainClass(packageName + JoinAndCalculate)
                                .args(??????????????????????)
                                .build();
                steps.add(StepConfig.builder().name(JoinAndCalculate).hadoopJarStep(hadoopJarStepConfig3)
                                .actionOnFailure("TERMINATE_JOB_FLOW").build());

                HadoopJarStepConfig hadoopJarStepConfig4 = HadoopJarStepConfig.builder()
                                .jar("s3n://" + myBucketName + "/" + ValueToKeySort + jarPostfix)
                                .mainClass(packageName + ValueToKeySort)
                                .args(??????????????????????)
                                .build();
                steps.add(StepConfig.builder().name(ValueToKeySort).hadoopJarStep(hadoopJarStepConfig4)
                                .actionOnFailure("TERMINATE_JOB_FLOW").build());

                JobFlowInstancesConfig instances = JobFlowInstancesConfig.builder()
                                .instanceCount(2)
                                .masterInstanceType("m4.large")
                                .slaveInstanceType("m4.large")
                                .hadoopVersion("3.3.4")
                                .ec2KeyName(myKeyPair)
                                .keepJobFlowAliveWhenNoSteps(false)
                                .placement(PlacementType.builder().availabilityZone("us-east-1a").build()).build();
wwtsj6pe

wwtsj6pe1#

电子病历与问题无关。不,它不是自动的。
我们需要看到你执行的JAR的代码,但我只假设它是传统的mapreduce代码,你使用FileInputFormat,可能有类似Path(args[0])的代码,如果是这样,那很可能是你的输入,那么Path(args[1])可能是输出。
因此,您只需在每个步骤中将这些参数链接在一起...

step1 = ...
   .args(new String[] {"/in", "/stage1" })
...
end = ...
   .args(new String[] {"/stageN", "/out" })

或者,将您的代码转换为Spark/Flink或Hive查询,其中多个mapreduce阶段会自动处理

相关问题