我正在尝试创建一个管道,将数据从Kafka主题流到google的bigquery。本主题中的数据以avro格式显示。
我调用apply函数3次。一次读取Kafka,一次提取记录,一次写入bigquery。以下是代码的主要部分:
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers(options.getKafkaBrokers().get())
.withTopics(Utils.getListFromString(options.getKafkaTopics()))
.withKeyDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
options.getSchemaRegistryUrl().get(),
options.getSubject().get())
)
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
options.getSchemaRegistryUrl().get(),
options.getSubject().get()))
.withoutMetadata()
)
.apply("Extract GenericRecord",
MapElements.into(TypeDescriptor.of(GenericRecord.class)).via(KV::getValue)
)
.apply(
"Write data to BQ",
BigQueryIO
.<GenericRecord>write()
.optimizedWrites()
.useBeamSchema()
.useAvroLogicalTypes()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
//Temporary location to save files in GCS before loading to BQ
.withCustomGcsTempLocation(options.getGcsTempLocation())
.withNumFileShards(options.getNumShards().get())
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withMethod(FILE_LOADS)
.withTriggeringFrequency(Utils.parseDuration(options.getWindowDuration().get()))
.to(new TableReference()
.setProjectId(options.getGcpProjectId().get())
.setDatasetId(options.getGcpDatasetId().get())
.setTableId(options.getGcpTableId().get()))
);
运行时,出现以下错误:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Extract GenericRecord/Map/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.apache.avro.generic.GenericRecord.
Building a Coder using a registered CoderProvider failed.
如何设置编码器正确读取avro?
1条答案
按热度按时间rbpvctlc1#
至少有三种方法:
将编码器设置为内联:
请注意,键被删除是因为它未使用,这样您就不再需要mapeelements了。
在管道的coderregistry示例中注册编码器:
通过以下方式从架构注册表获取编码器:
https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/kafka/confluentschemaregistrydeserializerprovider.html#getcoder-org.apache.beam.sdk.coders.coderregistry代码注册-