无法通过kafka avro控制台读取avro消息消费者(最终目标通过spark流读取)

42fyovps  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(392)

(最终目标)在尝试是否最终可以从合流平台读取avro数据usngsparkstream之前,就像这里描述的:将spark结构化流与合流模式注册表集成
我想验证是否可以使用下面的命令来读取它们:

$ kafka-avro-console-consumer \
> --topic my-topic-produced-using-file-pulse-xml \
> --from-beginning \
> --bootstrap-server localhost:9092 \
> --property schema.registry.url=http://localhost:8081

我收到这个错误信息,未知的魔法字节

Processed a total of 1 messages
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

注意,可以这样读取消息(使用console consumer而不是avro console consumer):

kafka-console-consumer \
--bootstrap-server localhost:9092 --group my-group-console \
--from-beginning \
--topic my-topic-produced-using-file-pulse-xml

该消息是使用confluent connect file pulse(1.5.2)读取xml文件(StreamThings/kafka connect file pulse)生成的
请帮帮我:我用过这个吗 kafka-avro-console-consumer 错了吗?我尝试了此处描述的“反序列化程序”属性选项:https://stackoverflow.com/a/57703102/4582240,没有帮助
我还不想勇敢地启动spark流来读取数据。
我使用的文件pulse1.5.2属性如下所示,添加于2020年9月11日以供完成。

name=connect-file-pulse-xml
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic= my-topic-produced-using-file-pulse-xml
tasks.max=1

# File types

fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.xml$
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader
force.array.on.fields=sometagNameInXml

# File scanning

fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker

fs.scan.directory.path=/tmp/kafka-connect/xml/
fs.scan.interval.ms=10000

# Internal Reporting

internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-xml
internal.kafka.reporter.topic=connect-file-pulse-status

# Track file by name

offset.strategy=name
ubbxdtey

ubbxdtey1#

如果您从使用者那里得到未知的魔字节,那么生产者没有使用合流的avroserializer,并且可能推送了不使用schema注册表的avro数据。
如果看不到生产者代码,或者不使用和检查二进制格式的数据,就很难知道是哪种情况。
该消息是使用合流连接文件pulse生成的
你用过吗 value.converter 用avroconverter类?

相关问题