我正在使用数据流作业从MS-SQL数据库读取数据,并将结果写入Big Query表。数据流作业的目的是能够基于运行的任何查询创建具有不同架构的表。我无法找到在执行JDBCIO读取时设置通用行Map器的方法。并且希望有一种标准的方法可以根据www. example. comResultSet中返回的行的模式来创建要写入BigQuery的行JDBCIO.read。
如果查询定义中不包含RowMapper,则会出现以下错误:
Exception in thread "main" java.lang.IllegalArgumentException: withRowMapper() is required at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:141)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:810)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.expand(JdbcIO.java:711)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:192)
at edu.mayo.mcc.aide.sqaTransfer.SqaTransfer.buildPipeline(SqaTransfer.java:81)
at edu.mayo.mcc.aide.sqaTransfer.SqaTransfer.main(SqaTransfer.java:66)
我正在尝试基于以下设置编写andrea:
PCollection<TableRow> results = pipeline
.apply("Connect", JdbcIO.<TableRow>read()
.withDataSourceConfiguration(buildDataSourceConfig(options, URL))
.withQuery(query)
.withRowMapper("WHAT NEEDS TO BE HERE TO CREATE A GENERIC ROW MAPPER"));
results.apply("Write to BQ",
BigQueryIO.writeTableRows()
.to(dataset)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
1条答案
按热度按时间vs91vp4v1#
在读取部分,可以将
TableRow
与JdbcIO
一起使用,如下所示:我认为
Dataflow
不是满足您需求的更容易的服务,因为您必须向BigQueryIO
传递一个模式,以允许Dataflow
使用CREATE_IF_NEEDED
选项创建表。使用以下方法传递架构:withSchema(schema),示例:
这意味着在您的情况下,必须从
PCollection
中的当前TableRow
构建模式。使用
Beam
并不容易做到这一点,如果您可以为BigQuery表使用一个schema,那就更容易了。我向你们提出另一种解决办法和途径:
MS-SQL
数据导出到Cloud Storage
autodetect
模式将Cloud Storage
文件导入BigQuery
,以从文件推断架构。如果希望使用
gcloud
cli自动执行此过程,可以在Shell
脚本中添加此逻辑。将
GCS
文件加载到BigQuery
的示例: