kafka流创建没有模式的avro主题

oknrviil  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(360)

我开发了一个java应用程序,它使用schema registry从avro主题读取数据,然后进行简单的转换,并在控制台中打印结果。默认情况下,我将genericavroserde类用于键和值。一切正常,除了我必须为每个serde-like定义额外的配置

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", kafkaStreamsConfig.getProperty("schema.registry.url"));
    final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
    keyGenericAvroSerde.configure(serdeConfig, true);
    valueGenericAvroSerde.configure(serdeConfig, false);

否则我总是会犯这样的错误:

Exception in thread "NTB27821-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=CH-PGP-LP2_S20-002_agg, partition=0, offset=4482940
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 69
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63)
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)

好吧,这是不真实的,但很好,在那之后(当我在上面发布的时候添加了配置调用),它工作了,我的应用程序能够处理所有的操作并打印出结果。
但是!当我尝试使用call through()时—只是为了将数据发布到新主题—我遇到了我要问的问题:主题是在没有模式的情况下创建的。怎么可能???
有趣的事实是,数据正在被写入,但它是:a)二进制格式,因此简单的消费者无法读取它b)它没有模式-因此avro消费者也无法读取它:

Processed a total of 1 messages
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
        at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

当然,我检查了主题的模式注册表:

curl -X GET http://localhost:8081/subjects/agg_value_9-value/versions
{"error_code":40401,"message":"Subject not found."}

但是对java应用程序生成者编写的另一个主题的相同调用显示了模式的存在:

curl -X GET http://localhost:8081/subjects/CH-PGP-LP2_S20-002_agg-value/versions
[1]

两个应用程序都使用相同的“schema.registry.url”配置,只是为了总结-创建了主题,写入了数据,可以用简单的使用者读取,但它是二进制的,并且模式不存在。
我还尝试用landoop创建一个模式,以某种方式匹配数据,但没有成功——顺便说一句,这不是使用kafka流的正确方法——所有事情都应该在运行中完成。
救命啊,求你了!

xfb7svmp

xfb7svmp1#

什么时候 through 调用,通过 StreamsConfig 除非用户特别重写它,否则将使用。您使用了哪个默认服务器?为了正确起见,您应该使用abstractkafkaavroserializer,它将自动注册该主题的模式。

相关问题