我的服务器上运行单节点kafka。我使用以下命令创建主题“bin/kafka-topics.sh--create--zookeeperlocalhost:2181 --replication-factor 1--分区1--主题测试”。我有两个logstash示例正在运行。第一个从某个java应用程序日志文件中读取数据并将其注入kafka。它工作得很好,我可以在控制台上使用“bin/kafka-console-consumer.sh--zookeeper”查看kafka中的数据localhost:2181 --topic test--from start“命令。但是另一个从kafka(同一主题“test”)读取并注入elasicsearch的logstash示例失败了。第二个logstash示例无法读取数据。我将其配置文件更改为从kafka读取并在控制台上打印,然后它也不会输出任何内容。以下是失败日志存储的配置文件:
// config file
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "test"
}
}
output {
stdout{}
}
logstash既不打印任何内容,也不抛出任何错误。我使用logstash2.4和kafka0.10。我用了Kafka快速入门指南(http://kafka.apache.org/documentation.html#quickstart)
3条答案
按热度按时间rta7y2nd1#
请看一下Kafka输入配置选项https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
请找到Kafka数据的logstash配置并将其推送到elk堆栈。
希望有帮助!
shstlldc2#
rkue9o1l3#
如果您查看kafka输入插件配置,您可以看到一个重要的参数,它允许连接到kafka集群:zk\u connect。
根据文档,它默认设置为localhost:2181. 确保将其设置为kafka集群示例,或者理想情况下设置为多个示例,具体取决于您的设置。
例如,假设您使用json主题连接到一个三节点kafka集群。配置如下:
另外,为主题配置正确的编解码器也很重要。上面的示例将处理json事件。如果使用avro,则需要设置另一个参数codec。有关如何配置它的详细信息,请参见文档页面。它基本上需要指向avro模式文件,可以指定为avsc文件或模式注册表端点(在我看来,这是更好的解决方案)。
如果您的kafka环境中运行了schema registry,您可以将编解码器指向它的url。一个完整的例子是:
希望有用!