从ksql流中使用avro kafka主题时出错

lyr7nygr  于 2021-07-15  发布在  Kafka
关注(0)|答案(1)|浏览(545)

我在ksqldb中用 VALUE_FORMAT='JSON' TOPIC='MYTOPIC' 设置已结束docker compose。我正在运行kafka代理、schema registry、ksqldbcli、ksqldb服务器、zookeeper
现在我想从主题中使用这些记录。我的第一个也是最后一个方法是通过命令行执行以下命令

docker run  --net=host  --rm  confluentinc/cp-schema-registry:5.0.0  kafka-avro-console-consumer
--bootstrap-server localhost:29092 --topic DXT --from-beginning --max-messages 10
--property print.key=true --property print.value=true
--value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer
--key-deserializer org.apache.kafka.common.serialization.StringDeserializer

但这只是返回错误

[2021-04-22 21:45:42,926] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我也在JavaSpring中尝试了不同的用例,但没有成功。我只是无法使用创建的主题。如果我需要定义自己的模式,我应该在哪里做,最简单的方法是什么,因为我刚刚在ksqldb中创建了一个流?有一个简单的例子。在ksqldb.io上创建流时,我没有指定任何其他内容,就像快速启动示例中那样(我在部署中添加了schema registry)因为我是一个坐在这里将近10个小时的noob,任何帮助都将不胜感激。
edit:我发现纯json不需要带有ksqldb的schema注册表。在这里。但如何反序列化呢?

bq8i3lrv

bq8i3lrv1#

如果已经将json数据写入主题,那么可以使用 kafka-console-consumer .
你得到的错误( Error deserializing Avro message for id -1…Unknown magic byte! )是因为你用的是 kafka-avro-console-consumer 它试图将主题数据反序列化为avro,但事实并非如此,因此出现了错误。
你也可以使用 PRINT DXT; 从ksqldb内部。

相关问题