我正在尝试将数据从Kafka传递到elasticsearch,然后传递到kibana。我使用Kafka河插件在这篇文章中提到link:elasticsearch-river-kafka plugin
在启动kafka zookeeper、服务器和生产者之后,我将数据作为{“test”:“one”}
然后开始elasticsearch。我在Kafka中遇到了以下错误:
[2016-02-04 00:05:00,094] ERROR Closing socket for /192.168.1.9 because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
在elasticsearch中,出现以下错误:
org.codehaus.jackson.JsonParseException: Unexpected character ('S' (code 83)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
另外,我在elasticsearch日志中看到: [2016-02-04 00:14:31,340][WARN ][river.routing ] [ISAAC] no river _meta document found after 5 attempts
知道我做错了什么吗?请帮忙。谢谢。
1条答案
按热度按时间w80xi6nr1#
在ElasticSearch中,rivers的概念被弃用,它增加了性能问题。为什么不考虑使用logstashkafka插件做同样的事情呢。你可以在https://www.elastic.co/blog/logstash-kafka-intro