这个问题有什么解决办法吗???我无法读取Kafkaavro模式消息。我正试图从logstash向kafka和hdfs发送消息。
以下是技术堆栈:
logstash 2.3-当前生产版本
汇合3.0。
插件:a。logstashkafka输出插件b。logstash编解码器avro。
Zookeeper:3.4.6
Kafka:0.10.0.0
日志存储配置文件如下所示:
input {
stdin{}
}
filter {
mutate {
remove_field => ["@timestamp","@version"]
}
}
output {
kafka {
topic_id => 'logstash_logs14'
codec => avro {
schema_uri => "/opt/logstash/bin/schema.avsc"
}
}
}
schema.avsc文件如下所示:
{
"type":"record",
"name":"myrecord",
"fields":[
{"name":"message","type":"string"},
{"name":"host","type":"string"}
]
}
已运行以下命令:
在自己的终端启动zookeeper
./bin/zookeeper服务器启动。/etc/kafka/zookeeper.properties
2在Kafka自己的终点站出发
./bin/kafka-server-start ./etc/kafka/server.properties
3在自己的终端中启动模式注册表
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
4从logstash目录运行以下命令
bin/logstash -f ./bin/logstash.conf
5在运行上述命令后,键入要发送给kafka的日志消息,例如:“hello world”
6.Kafka的主题
./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
请告诉我如何解决这个问题
谢谢,厄彭德拉
2条答案
按热度按时间5kgi1eie1#
你是如何写给Kafka的?您看到serializationexception是因为数据不是使用schema registry(或kafkaavroserializer)写入的,而是在使用schema registry时,kafka avro控制台使用者在内部使用schema registry(或kafkaavroserializer),它期望数据是某种格式(特别是
<magic byte><schemaId><data>
). 如果使用kafka avro console producer编写avro数据,则不应出现此异常,或者可以在producer属性中为键值序列化程序设置kafkaavroserializer,还可以设置架构注册表url。zy1mlcev2#
也许回答得太晚了,但现在面临同样的问题。
logstrash在这里使用默认的序列化程序“org.apache.kafka.common.serialization.stringserializer”
因此,如果您想从事件总线读取avro消息,则必须使用日志存储输出“io.confluent.kafka.serializers.kafkavroserializer”上的kafkaavroserializers对其进行序列化
然后从使用者部分使用匹配的反序列化程序。问题是logstash根本无法识别io.confluent,所以您必须做一些棘手的事情来添加它,比如dep和jar