使用confluentschemaregistry反序列化avro数据时出现异常?

yeotifhr  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(532)

我是Flink和Kafka的新人。我正在尝试使用合流模式注册表反序列化avro数据。我已经在ec2机器上安装了flink和kafka。另外,“test”主题是在运行代码之前创建的。
代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2
作为实现的一部分,代码执行以下操作:

1) Create a flink DataStream object using a list of user element. (User class is avro generated class)
2) Write the Datastream source to Kafka using AvroSerializationSchema.
3) Read the data from Kafka using ConfluentRegistryAvroDeserializationSchema by reading the schema from Confluent Schema registry.

运行flink可执行jar的命令:

./bin/flink run -c com.streaming.example.ConfluentSchemaRegistryExample /opt/flink-1.7.2/kafka-flink-stream-processing-assembly-0.1.jar

运行代码时出现异常:

java.io.IOException: Unknown data format. Magic number does not match
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:55)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:66)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

我在user类中使用的avro模式如下:

{
  "type": "record",
  "name": "User",
  "namespace": "com.streaming.example",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "favorite_color",
      "type": [
        "string",
        "null"
      ]
    }
  ]
}

有人能指出我在使用合流kafka模式注册表反序列化avro数据时遗漏了哪些步骤吗?

jm81lzqq

jm81lzqq1#

如何编写avro数据还需要使用注册表,以便依赖于它的反序列化程序工作。
但这是一个开放的公关在Flink,仍然为添加一个 ConfluentRegistryAvroSerializationSchema
我相信解决办法是 AvroDeserializationSchema ,它不依赖于注册表。
如果您确实想在producer代码中使用注册表,那么您必须在flink之外这样做,直到pr被合并。

相关问题