我对google cloud还是个新手,对google cloud dataflow更是如此。我创建了一个java应用程序,其唯一目的是运行数据流作业。详情如下:
首先,我创建一个具有以下属性的bean:
@Bean(name="dataflowTemplateJobConfigMap")
public Map<DataflowTemplateJob, DataflowTemplateConfig> dataflowTemplateJobConfigMap(
@Value("${job.gcs.export.gcs.path}") String gcsExportGcsPath,
@Value("${job.gcs.export.bigtable.project.id}") String gcsExportBigtableProjectId,
@Value("${job.gcs.export.bigtable.instance.id}") String gcsExportBigtableInstanceId,
@Value("${job.gcs.export.bigtable.table.id}") String gcsExportBigtableTableId,
@Value("${job.gcs.export.output.directory}") String gcsExportOutputDirectory,
@Value("${job.gcs.export.filename.prefix}") String gcsExportFilenamePrefix,
@Value("${job.gcs.export.num.shards}") String gcsExportNumShards) {
Map<DataflowTemplateJob, DataflowTemplateConfig> configMap = new HashMap<>();
// gcs-export job
DataflowTemplateConfig gcsExportConfig = new DataflowTemplateConfig();
gcsExportConfig.setGcsPath(gcsExportGcsPath);
gcsExportConfig.setParameterMap(ImmutableMap.<String, String>builder()
.put("bigtableProjectId", gcsExportBigtableProjectId)
.put("bigtableInstanceId", gcsExportBigtableInstanceId)
.put("bigtableTableId", gcsExportBigtableTableId)
.put("outputDirectory", gcsExportOutputDirectory)
.put("filenamePrefix", gcsExportFilenamePrefix)
.put("numShards", gcsExportNumShards)
.build());
configMap.put("GCS_EXPORT", gcsExportConfig);
return configMap;
}
你问的DataflowTemplateConfig看起来像什么?
@Getter
@Setter
public class DataflowTemplateConfig {
private String gcsPath;
private Map<String, String> parameterMap;
}
然后我有一个run方法:
@Value("${google.cloud.project.id}")
private String projectId;
@Value("${google.cloud.region}")
private String region;
@Resource(name="dataflowTemplateJobConfigMap")
private Map<DataflowTemplateJob, DataflowTemplateConfig> dataflowTemplateConfigMap;
public Job run(DataflowTemplateJob dataflowTemplateJob) throws IOException {
DataflowTemplateConfig config = dataflowTemplateConfigMap.get(dataflowTemplateJob);
try (TemplatesServiceClient templatesServiceClient =TemplatesServiceClient.create()) {
CreateJobFromTemplateRequest request =
CreateJobFromTemplateRequest.newBuilder()
.setProjectId(projectId)
.setJobName(dataflowTemplateJob.getJobName())
.putAllParameters(config.getParameterMap())
.setEnvironment(RuntimeEnvironment.newBuilder().build())
.setLocation(region)
.setGcsPath(config.getGcsPath())
.build();
Job response = templatesServiceClient.createJobFromTemplate(request);
jobRepository.addJob(response);
return response;
} catch (Exception e) {
throw new RuntimeException("Failed to start Dataflow job", e);
}
}
当我通过命令行运行作业时,作业通常会成功启动,但总是失败。第一次运行时,我得到了以下错误:
{
insertId: "6e9nqud4qhu"
labels: {4}
logName: "projects/test/test-dataflow/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2023-05-06T00:56:00.125700767Z"
resource: {2}
severity: "ERROR"
textPayload: "Error processing pipeline."
timestamp: "2023-05-06T00:55:59.103993244Z"
}
请注意,我已经尝试了一段时间,在让作业运行之前,我已经正确设置了服务帐户和工作负载标识。无论如何,我正在研究推荐的文档,但有人使用TemplatesServiceClient吗?如果是这样,你能链接你的git repo吗?我只是想排除这是一个与代码相关的问题,它实际上是在服务帐户/工作负载标识方面。如果任何人可以帮助它将不胜感激!希望这是足够的信息。
哦,这里是一些TemplatesServiceClient的文档,以防你不熟悉/不确定:https://cloud.google.com/java/docs/reference/google-cloud-dataflow/latest/overview#templatesserviceclient这是我使用的特定模板:gs://dataflow-templates/2023-04-11-00_RC00/Cloud_Bigtable_to_GCS_Parquet
1条答案
按热度按时间ztmd8pv51#
好了,深入研究数据流API,我们发现还有一些字段需要包含在CreateJobFromTemplateRequest中。它们是:IpConfiguration、Network、SubNetwork和ServiceAccount。下面是代码更改的样子:
这将使作业运行,甚至完成作业。