我只看到一个线程包含关于我提到的主题的信息:如何使用apachebeam反序列化kafkaavro消息
然而,在尝试了Kafka序列化程序的一些变体之后,我仍然无法反序列化Kafka消息。这是我的密码:
public class Readkafka {
private static final Logger LOG = LoggerFactory.getLogger(Readkafka.class);
public static void main(String[] args) throws IOException {
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
KafkaIO.<action_states_pkey, String>read()
.withBootstrapServers("mybootstrapserver")
.withTopic("action_States")
.withKeyDeserializer(MyClassKafkaAvroDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistryurl"))
.withMaxNumRecords(5)
.withoutMetadata();
p.apply(kafka)
.apply(Keys.<action_states_pkey>create())
}
MyClasskafkaavrodeserizer在哪里
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<action_states_pkey> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public action_states_pkey deserialize(String s, byte[] bytes) {
return (action_states_pkey) this.deserialize(bytes);
}
@Override
public void close() {} }
集体诉讼是由avro工具生成的代码
java -jar pathtoavrotools/avro-tools-1.8.1.jar compile schema pathtoschema/action_states_pkey.avsc destination path
这里的动作是
{"type":"record","name":"action_states_pkey","namespace":"namespace","fields":[{"name":"ad_id","type":["null","int"]},{"name":"action_id","type":["null","int"]},{"name":"state_id","type":["null","int"]}]}
用这个代码我得到了一个错误:
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:20)
at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:1)
at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:221)
at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.advanceWithBackoff(BoundedReadFromUnboundedSource.java:279)
at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.start(BoundedReadFromUnboundedSource.java:256)
at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:592)
... 14 more
似乎尝试将avro数据Map到我的自定义类时出错了?
或者,我尝试了以下代码:
PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
KafkaIO.<action_states_pkey, String>read()
.withBootstrapServers("bootstrapserver")
.withTopic("action_states")
.withKeyDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(action_states_pkey.class))
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistry"))
.withMaxNumRecords(5)
.withoutMetadata();
p.apply(kafka);
.apply(Keys.<action_states_pkey>create())
// .apply("ExtractWords", ParDo.of(new DoFn<action_states_pkey, String>() {
// @ProcessElement
// public void processElement(ProcessContext c) {
// action_states_pkey key = c.element();
// c.output(key.getAdId().toString());
// }
// }));
这不会给我任何错误,直到我尝试打印出数据。我必须验证我是否以某种方式成功地读取了数据,因此我的目的是在控制台中记录数据。如果我取消注解已注解的部分,我会再次出现相同的错误:
SEVERE: 2019-09-13T07:53:56.168Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
at my.mudah.beam.test.Readkafka$1.processElement(Readkafka.java:151)
另外需要注意的是,如果我指定:
.updateConsumerProperties(ImmutableMap.of("specific.avro.reader", (Object)"true"))
总是给我一个错误
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 443
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class NAMESPACE.action_states_pkey specified in writer's schema whilst finding reader's schema for a SpecificRecord.
我的方法好像有问题?如果有人有任何使用apachebeam从kafka流读取avro数据的经验,请帮助我。我非常感激。
下面是我的包的快照,其中还包含模式和类:包/工作路径详细信息
谢谢。
1条答案
按热度按时间qhhrdooz1#
公共类myclasskafkavrodeserializer扩展了抽象kafkavrodeserializer
你们班正在延长课程
AbstractKafkaAvroDeserializer
它回来了GenericRecord
.你需要转换
GenericRecord
到您的自定义对象。或
使用
SpecificRecord
如以下答案之一所述:以上内容复制自https://stackoverflow.com/a/39617120/2534090
https://stackoverflow.com/a/42514352/2534090