java 使用通用行Map器读取数据流JDBCIO

qfe3c7zg  于 2023-01-11  发布在  Java
关注(0)|答案(1)|浏览(105)

我正在使用数据流作业从MS-SQL数据库读取数据,并将结果写入Big Query表。数据流作业的目的是能够基于运行的任何查询创建具有不同架构的表。我无法找到在执行JDBCIO读取时设置通用行Map器的方法。并且希望有一种标准的方法可以根据www. example. comResultSet中返回的行的模式来创建要写入BigQuery的行JDBCIO.read。
如果查询定义中不包含RowMapper,则会出现以下错误:

Exception in thread "main" java.lang.IllegalArgumentException: withRowMapper() is required  at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:810)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:711)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:192)
    at edu.mayo.mcc.aide.sqaTransfer.SqaTransfer.buildPipeline(SqaTransfer.java:81)
    at edu.mayo.mcc.aide.sqaTransfer.SqaTransfer.main(SqaTransfer.java:66)

我正在尝试基于以下设置编写andrea:

PCollection<TableRow> results = pipeline
        .apply("Connect", JdbcIO.<TableRow>read()
                .withDataSourceConfiguration(buildDataSourceConfig(options, URL))
                .withQuery(query)
                .withRowMapper("WHAT NEEDS TO BE HERE TO CREATE A GENERIC ROW MAPPER"));

results.apply("Write to BQ",
        BigQueryIO.writeTableRows()
                .to(dataset)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
vs91vp4v

vs91vp4v1#

在读取部分,可以将TableRowJdbcIO一起使用,如下所示:

PCollection<TableRow> results = pipeline
        .apply("Connect", JdbcIO.<TableRow>read()
                .withDataSourceConfiguration(buildDataSourceConfig(options, URL))
                .withQuery(query)
                .withCoder(TableRowJsonCoder.of())
                .withRowMapper(new JdbcIO.RowMapper<TableRow>() {
                    @Override
                    public TableRow mapRow(ResultSet resultSet) {
                        // Implements your logic here.
                      
                        return tableRow;
                    }
                )
        );

我认为Dataflow不是满足您需求的更容易的服务,因为您必须向BigQueryIO传递一个模式,以允许Dataflow使用CREATE_IF_NEEDED选项创建表。
使用以下方法传递架构:withSchema(schema),示例:

rows.apply(
        "Write to BigQuery",
        BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s", project, dataset, table))
            .withSchema(schema)
            // For CreateDisposition:
            // - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
            // required
            // - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            // For WriteDisposition:
            // - WRITE_EMPTY (default): raises an error if the table is not empty
            // - WRITE_APPEND: appends new rows to existing rows
            // - WRITE_TRUNCATE: deletes the existing rows before writing
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    // pipeline.run().waitUntilFinish();
  }

这意味着在您的情况下,必须从PCollection中的当前TableRow构建模式。
使用Beam并不容易做到这一点,如果您可以为BigQuery表使用一个schema,那就更容易了。
我向你们提出另一种解决办法和途径:

  • MS-SQL数据导出到Cloud Storage
  • 使用autodetect模式将Cloud Storage文件导入BigQuery,以从文件推断架构。

如果希望使用gcloud cli自动执行此过程,可以在Shell脚本中添加此逻辑。
GCS文件加载到BigQuery的示例:

bq load \
    --autodetect \
    --replace \
    --source_format=CSV \
    mydataset.mytable \
    gs://mybucket/mydata.csv

相关问题