java 如何正确使用TemplatesServiceClient google-cloud-dataflow

xt0899hw  于 2023-05-12  发布在  Java
关注(0)|答案(1)|浏览(94)

我对google cloud还是个新手,对google cloud dataflow更是如此。我创建了一个java应用程序,其唯一目的是运行数据流作业。详情如下:
首先,我创建一个具有以下属性的bean:

@Bean(name="dataflowTemplateJobConfigMap")
public Map<DataflowTemplateJob, DataflowTemplateConfig> dataflowTemplateJobConfigMap(
        @Value("${job.gcs.export.gcs.path}") String gcsExportGcsPath,
        @Value("${job.gcs.export.bigtable.project.id}") String gcsExportBigtableProjectId,
        @Value("${job.gcs.export.bigtable.instance.id}") String gcsExportBigtableInstanceId,
        @Value("${job.gcs.export.bigtable.table.id}") String gcsExportBigtableTableId,
        @Value("${job.gcs.export.output.directory}") String gcsExportOutputDirectory,
        @Value("${job.gcs.export.filename.prefix}") String gcsExportFilenamePrefix,
        @Value("${job.gcs.export.num.shards}") String gcsExportNumShards) {
    Map<DataflowTemplateJob, DataflowTemplateConfig> configMap = new HashMap<>();

    // gcs-export job
    DataflowTemplateConfig gcsExportConfig = new DataflowTemplateConfig();
    gcsExportConfig.setGcsPath(gcsExportGcsPath);
    gcsExportConfig.setParameterMap(ImmutableMap.<String, String>builder()
            .put("bigtableProjectId", gcsExportBigtableProjectId)
            .put("bigtableInstanceId", gcsExportBigtableInstanceId)
            .put("bigtableTableId", gcsExportBigtableTableId)
            .put("outputDirectory", gcsExportOutputDirectory)
            .put("filenamePrefix", gcsExportFilenamePrefix)
            .put("numShards", gcsExportNumShards)
            .build());
    configMap.put("GCS_EXPORT", gcsExportConfig);

    return configMap;
}

你问的DataflowTemplateConfig看起来像什么?

@Getter
@Setter
public class DataflowTemplateConfig {
    private String gcsPath;
    private Map<String, String> parameterMap;
}

然后我有一个run方法:

@Value("${google.cloud.project.id}")
private String projectId;

@Value("${google.cloud.region}")
private String region;

@Resource(name="dataflowTemplateJobConfigMap")
private Map<DataflowTemplateJob, DataflowTemplateConfig> dataflowTemplateConfigMap;

public Job run(DataflowTemplateJob dataflowTemplateJob) throws IOException {
    DataflowTemplateConfig config = dataflowTemplateConfigMap.get(dataflowTemplateJob);
    try (TemplatesServiceClient templatesServiceClient =TemplatesServiceClient.create()) {
        CreateJobFromTemplateRequest request =
                CreateJobFromTemplateRequest.newBuilder()
                        .setProjectId(projectId)
                        .setJobName(dataflowTemplateJob.getJobName())
                        .putAllParameters(config.getParameterMap())
                        .setEnvironment(RuntimeEnvironment.newBuilder().build())
                        .setLocation(region)
                        .setGcsPath(config.getGcsPath())
                        .build();
        Job response = templatesServiceClient.createJobFromTemplate(request);
        jobRepository.addJob(response);
        return response;
    } catch (Exception e) {
        throw new RuntimeException("Failed to start Dataflow job", e);
    }
}

当我通过命令行运行作业时,作业通常会成功启动,但总是失败。第一次运行时,我得到了以下错误:

{
insertId: "6e9nqud4qhu"
labels: {4}
logName: "projects/test/test-dataflow/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2023-05-06T00:56:00.125700767Z"
resource: {2}
severity: "ERROR"
textPayload: "Error processing pipeline."
timestamp: "2023-05-06T00:55:59.103993244Z"
}

请注意,我已经尝试了一段时间,在让作业运行之前,我已经正确设置了服务帐户和工作负载标识。无论如何,我正在研究推荐的文档,但有人使用TemplatesServiceClient吗?如果是这样,你能链接你的git repo吗?我只是想排除这是一个与代码相关的问题,它实际上是在服务帐户/工作负载标识方面。如果任何人可以帮助它将不胜感激!希望这是足够的信息。
哦,这里是一些TemplatesServiceClient的文档,以防你不熟悉/不确定:https://cloud.google.com/java/docs/reference/google-cloud-dataflow/latest/overview#templatesserviceclient这是我使用的特定模板:gs://dataflow-templates/2023-04-11-00_RC00/Cloud_Bigtable_to_GCS_Parquet

ztmd8pv5

ztmd8pv51#

好了,深入研究数据流API,我们发现还有一些字段需要包含在CreateJobFromTemplateRequest中。它们是:IpConfiguration、Network、SubNetwork和ServiceAccount。下面是代码更改的样子:

CreateJobFromTemplateRequest request =
                CreateJobFromTemplateRequest.newBuilder()
                        .setProjectId(projectId)
                        .setJobName(dataflowTemplateJob.getJobName())
                        .putAllParameters(parameterMap)
                        .setEnvironment(RuntimeEnvironment.newBuilder()
                                .setIpConfiguration(WorkerIPAddressConfiguration.WORKER_IP_PRIVATE)
                                .setNetwork(network)
                                .setSubnetwork(subnetwork)
                                .setServiceAccountEmail(serviceAccountEmail)
                                .build())
                        .setLocation(region)
                        .setGcsPath(config.getGcsPath())
                        .build();

这将使作业运行,甚至完成作业。

相关问题