寻找一个boto3-python示例,将aws pig步骤注入已经运行的emr中?

1cosmwyk  于 2021-06-21  发布在  Pig
关注(0)|答案(2)|浏览(303)

我正在寻找一个好的例子,一个aws电子病历已经运行,我希望注入一个Pig步骤到该电子病历。以前,我使用的是boto2.42版本:

from boto.emr.connection import EmrConnection
from boto.emr.step import InstallPigStep, PigStep

# AWS_ACCESS_KEY = '' # REQUIRED

# AWS_SECRET_KEY = '' # REQUIRED

# conn = EmrConnection(AWS_ACCESS_KEY, AWS_SECRET_KEY)

# loop next element on bucket_compare list

pig_file = 's3://elasticmapreduce/samples/pig-apache/do-reports2.pig'
INPUT = 's3://elasticmapreduce/samples/pig-apache/input/access_log_1'
OUTPUT = '' # REQUIRED, S3 bucket for job output

pig_args = ['-p', 'INPUT=%s' % INPUT,
             '-p', 'OUTPUT=%s' % OUTPUT]
pig_step = PigStep('Process Reports', pig_file, pig_args=pig_args)
steps = [InstallPigStep(), pig_step]

conn.run_jobflow(name='prs-dev-test', steps=steps,
             hadoop_version='2.7.2-amzn-2', ami_version='latest',
             num_instances=2, keep_alive=False)

现在的主要问题是,boto3不使用:from boto.emr.connection 导入 EmrConnection ,也不是来自 boto.emr.step 导入 InstallPigStep ,pigstep和我找不到一组等价的模块?

4uqofj5v

4uqofj5v1#

经过一番检查,我发现了一种非常简单的方法,可以使用awscli和子进程模块从python中注入pig脚本命令。您可以导入awscli&subprocess,然后将所需的pig步骤封装并注入到已运行的emr中,其中包括:

import awscli
import subprocess

cmd='aws emr add-steps --cluster-id j-GU07FE0VTHNG --steps Type=PIG,Name="AggPigProgram",ActionOnFailure=CONTINUE,Args=[-f,s3://dev-end2end-test/pig_scripts/AggRuleBag.pig,-p,INPUT=s3://dev-end2end-test/input_location,-p,OUTPUT=s3://end2end-test/output_location]'

push=subprocess.Popen(cmd, shell=True, stdout = subprocess.PIPE)
print(push.returncode)

当然,您必须使用以下方法找到您的jobflowid:

aws emr list-clusters --active

使用上面相同的子进程和push命令。当然,你可以添加监控到你的心喜悦,而不仅仅是一个打印声明。

8i9zcol2

8i9zcol22#

下面是如何为一个pig作业向现有emr集群作业流添加一个新步骤
注意:脚本日志文件、输入和输出目录的完整路径应为 's3://<bucket>/<directory>/<file_or_key>' ```
emrcon = boto3.client("emr")
cluster_id1 = cluster_status_file_content #Retrieved from S3, where it was recorded on creation

            step_id = emrcon.add_job_flow_steps(JobFlowId=str(cluster_id1),
                                                Steps=[{
                                                        'Name': str(pig_job_name),
                                                        'ActionOnFailure': 'CONTINUE',
                                                        'HadoopJarStep': {
                                                            'Jar': 'command-runner.jar',
                                                            'Args': ['pig', "-l", str(pig_log_file_full_path), "-f", str(pig_job_run_script_full_path), "-p", "INPUT=" + str(pig_input_dir_full_path), 
                                                                                    "-p", "OUTPUT=" + str(pig_output_dir_full_path) ]
                                                        }
                                                    }]
                                                )
请看监控截图-
![](https://i.stack.imgur.com/Hvkxy.png)

相关问题