我已经将emr集群配置为具有第二个hadoop队列。“default”队列和我命名为“gold”的队列。当我的emr集群启动时,队列存在并正在运行,正如我在hadoopweb界面中看到的那样。
我的java代码创建了一个管道作业(在本例中是hiveactivity),并向管道对象添加了字段对象列表。这些字段是管道的设置。
一个这样的字段(根据https://docs.aws.amazon.com/datapipeline/latest/developerguide/dp-object-hiveactivity.html)是“hadoopqueue”。当我不设置此字段时,默认情况下使用“default”队列。工作完成了,一切都很好。
当我用这个代码设置字段时
//hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("gold");
fields.add(field);
作业未完成,我的数据管道日志中出现以下错误
Exception in thread "main" java.lang.RuntimeException: Local file does not exist.
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.fetchFile(ScriptRunner.java:30)
at com.amazon.elasticmapreduce.scriptrunner.ScriptRunner.main(ScriptRunner.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
at org.apache.hadoop.util.RunJar.main(RunJar.java:153)
如果我显式地将队列设置为'default',我也会得到这个错误,比如
//hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("default");
fields.add(field);
只有当我没有设置这个选项但没有使用它意味着我不能指定另一个队列时,它才起作用。
有没有人能够以这种方式成功地使用“hadoopqueue”选项?
public static String scheduleDataPipelineActivity(DataPipeline dpl, String pipelineId, String script, String stepName,
Map<String,String> params) {
PutPipelineDefinitionRequest putDefReq = new PutPipelineDefinitionRequest();
putDefReq.setPipelineId(pipelineId);
List<PipelineObject> objects = new ArrayList<>();
List<Field> fields = new ArrayList<>();
Field field = new Field();
field.setKey("failureAndRerunMode");
field.setStringValue("CASCADE");
fields.add(field);
field = new Field();
field.setKey("resourceRole");
field.setStringValue("DataPipelineDefaultResourceRole");
fields.add(field);
field = new Field();
field.setKey("role");
field.setStringValue("DataPipelineDefaultRole");
fields.add(field);
field = new Field();
field.setKey("pipelineLogUri");
field.setStringValue("s3://" + getBucketName() + "/logs");
fields.add(field);
field = new Field();
field.setKey("scheduleType");
field.setStringValue("ONDEMAND");
fields.add(field);
if ((params != null) && (params.size() > 0)) {
for (Map.Entry<String,String> entry : params.entrySet()) {
field = new Field();
field.setKey("scriptVariable");
field.setStringValue(entry.getKey() + "=" + entry.getValue());
fields.add(field);
}
}
PipelineObject po = new PipelineObject().withName("Default").withId("Default");
po.setFields(fields);
objects.add(po);
fields = new ArrayList<>();
field = new Field();
field.setKey("stage");
field.setStringValue("false");
fields.add(field);
field = new Field();
if (script.startsWith("s3://")) {
field.setKey("scriptUri");
}
else if (script.length() > 2048) {
field.setKey("scriptUri");
Writer writer = null;
try {
String hiveQL = UUID.randomUUID().toString() + ".q";
File localFile = new File(hiveQL);
writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile), "UTF-8"));
writer.write(script);
writer.flush();
writer.close();
writer = null;
String s3Key = "working/" + hiveQL;
script = "s3://" + getBucketName() + "/" + s3Key;
AmazonS3 s3 = getS3Client(null);
s3.putObject(getBucketName(), s3Key, localFile);
if (!localFile.delete()) {
LOGGER.error("Unable to delete temporary file: " + hiveQL);
}
}
catch (IOException e) {
LOGGER.error(e.getLocalizedMessage(), e);
throw new RuntimeException(e.getLocalizedMessage(), e);
}
finally {
if (writer != null) {
try {
writer.close();
}
catch(IOException e) {
LOGGER.error(e.getLocalizedMessage(), e);
}
}
}
}
else {
field.setKey("hiveScript");
}
field.setStringValue(script);
fields.add(field);
field = new Field();
field.setKey("workerGroup");
field.setStringValue(EMR_INSTANCE);
fields.add(field);
hadoopQueue
field = new Field();
field.setKey("hadoopQueue");
field.setStringValue("gold");
fields.add(field);
field = new Field();
field.setKey("type");
field.setStringValue("HiveActivity");
fields.add(field);
String hiveId = UUID.randomUUID().toString();
po = new PipelineObject().withName(stepName).withId(hiveId);
po.setFields(fields);
objects.add(po);
putDefReq.setPipelineObjects(objects);
PutPipelineDefinitionResult putPipelineResult = dpl.putPipelineDefinition(putDefReq);
List<ValidationError> errors = putPipelineResult.getValidationErrors();
int errorCount = 0;
if ((errors != null) && (errors.size() > 0)) {
for (ValidationError error : errors) {
List<String> errorStrs = error.getErrors();
for (String errorStr : errorStrs) {
LOGGER.error(errorStr);
errorCount++;
}
}
}
List<ValidationWarning> warnings = putPipelineResult.getValidationWarnings();
if ((warnings != null) && (warnings.size() > 0)) {
for (ValidationWarning warning : warnings) {
List<String> warningStrs = warning.getWarnings();
for (String warningStr : warningStrs) {
LOGGER.warn(warningStr);
}
}
}
if (errorCount > 0) {
LOGGER.fatal("BAD STUFF HAPPENED!!!!");
throw new DataPipelineValidationException("Validation errors detected for hive activity (check log file for details): " +
"step=" + stepName + "\terror count=" + Integer.toString(errorCount) + "\t" + pipelineId);
}
ActivatePipelineRequest activateReq = new ActivatePipelineRequest();
activateReq.setPipelineId(pipelineId);
dpl.activatePipeline(activateReq);
try {
Thread.sleep(2500L);
}
catch (InterruptedException e) {
LOGGER.error(e.getLocalizedMessage(), e);
}
return hiveId;
暂无答案!
目前还没有任何答案,快来回答吧!