我正试着和Kafka合作。
我通过kafka发送一个csv日志文件,以便它将消息发布到我的spark流应用程序。
我在spark流应用程序中使用直接方法来实现这一点。
我的日志文件中的数据在开始时很好地插入,但是过了一会儿,我在scalaide上看到了以下错误消息。
环境:我在所有内核上运行spark。Zookeeper,Kafka也在我的系统上本地运行。
错误:
16/09/05 17:53:28 ERROR Executor: Exception in task 0.0 in stage 390.0 (TID 390)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
发生的情况是,这个异常是中途抛出的,有时控制台上会有一堆日志数据跟随这个错误消息。
我有点困惑,因为这看起来像是一个网络错误-“封闭通道异常”,但由于我在本地运行所有这些进程,我想知道是否还有其他原因可能是根本原因。
如果我能得到一些解决这个问题的建议,那就太好了。
1条答案
按热度按时间3npbholx1#
在kafka producer.properties配置中将localhost替换为计算机ip(如metadata.broker.list)。同时在/etc/hosts文件中替换:
127.0.0.1 localhost localhost.localdomain
具有
x、 x.x.x localhost localhost.localdomain
其中x.x.x.x是您的机器ip。看看是否有用。