由于错误而关闭/127.0.0.1的套接字时出错(kafka.network.processor)

7z5jn7bk  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(883)

我对apache camel和apache kafka还很陌生,我正在为我的项目做一个小型poc。当我尝试使用camel-kafka组件读取kafka时,我得到了以下问题错误日志。

[2016-01-20 08:47:10,979] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2016-01-20 08:47:44,643] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2016-01-20 08:47:54,545] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
    at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
    at kafka.network.MultiSend.writeTo(Transmission.scala:101)
    at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
    at kafka.network.Processor.write(SocketServer.scala:472)
    at kafka.network.Processor.run(SocketServer.scala:342)
    at java.lang.Thread.run(Thread.java:745)

我的java代码如下:

public class Main {
public static void main(String[] args) throws Exception {

    CamelContext context = new DefaultCamelContext();
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("kafka:127.0.0.1:9092?topic=TEST&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1")
                    /*.marshal(xmlJsonFormat)*/
            .process(new XmlToJson())
                    /*.to("kafka:localhost:9092?topic=TestJson&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1");*/
            .to("file:/Users/himanshu/Desktop/TransCamelFuse/test.txt");
        }
    });
    context.start();
    Thread.sleep(10000);
    context.stop();
}

}
我把一些文本从Kafka生产者控制台工具,并试图阅读使用Kafka Camel 组件。

9bfwbjaz

9bfwbjaz1#

您需要确保kafka服务器和kafka客户端版本彼此兼容,因为我的kafka服务器是0.8,有些SpringBean使用kafka 2.4库,所以每当2.4客户端请求任何kafka 0.8操作时,server.log都会记录此错误
解决方案确保在代码中没有bean使用不兼容的客户端进行通信。

4xrmg8kj

4xrmg8kj2#

这不是错误。给定此错误是因为给定数据类型。你正在一个接一个地手动给Kafka提供数据。

相关问题