java中protobuf到bigquery的转换

vecaoik1  于 2021-07-06  发布在  Java
关注(0)|答案(1)|浏览(410)

我们将protobuf与gcp的pubsub和dataflow结合使用。我们用一个proto文件定义发送到pubsub的数据和bigquery模式。
publisher-(send proto)->pubsub->dataflow-(write)->bigquery
有时数据流会做一些修饰性的改变,但它主要是将protobuf中的字段复制到bigquery中。
我的问题是,有没有一种方法可以自动将protobuf模型转换为bigquery的tablerow?
下面是简化的数据流代码。我想消除大部分的代码 ProtoToTableRow 班级:

public class MyPipeline {
    public static void main(String[] args) {
        events = pipeline.apply("ReadEvents",
                PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
        events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
                .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .withExtendedErrorInfo()
                        .to(table));
    }
}

// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> {

    @ProcessElement
    public void processElement(ProcessContext c) {
        Core.Foo foo = c.element().getFoo();
        TableRow fooRow = new TableRow()
                .set("id", foo.getId())
                .set("bar", foo.getBar())
                .set("baz", foo.getBaz());

        // similar code repeated for 100s of lines

        TableRow row = new TableRow()
                .set("foo", foo)

        c.output(row);
    }
}
5cnsuln7

5cnsuln71#

你可以用一种很酷的方式来完成。beam为各种类提供了模式推理方法,包括javabean、autovalue类以及协议缓冲区。
对于管道,不需要转换为tablerow,可以执行以下操作:

pipeline.getSchemaRegistry().registerSchemaProvider(
    Core.MyProtoObject.class, new ProtoMessageSchema());

events = pipeline.apply("ReadEvents",
                PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));

events.apply("WriteToBigQuery", BigQueryIO.write()
                        .useBeamRows()
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                        .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                        .withExtendedErrorInfo()
                        .to(table));

注意 useBeamRows 中的参数 BigQueryIO.write -这将使用自动转换。

相关问题