o.apache.kafka.common.network.selector:localhost/127.0.0.1的i/o错误

8zzbczxx  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(522)

我的应用程序使用运行在一台计算机上的kafka服务器的消息,然后将它们转发到运行在其他示例上的另一个远程kafka服务器。在我将我的应用程序部署到CloudFoundry并向第一个kafka服务器发送一条消息之后,应用程序按预期工作。消息被消费并转发到远程Kafka。
然而,在这之后,我在CloudFoundry中得到了以下异常的无限循环(在我的本地机器上也是以较慢的速度):
堆栈跟踪:

Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT 2016-06-03 18:20:34.900 WARN 29 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Error in I/O with localhost/127.0.0.1
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT java.net.ConnectException: Connection refused
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65-]

我的应用程序yaml文件是这样的
应用程序yml:

spring:
  cloud:
    stream:
      bindings:
        activationMsgQueue:
          binder: kafka1
          destination: test
          contentType: application/json
          consumer:
            resetOffsets: true
            startOffset: latest
        input:
          binder: kafka2
          content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo
          destination: test
          group: prac  
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              kafka:
                host: caapmsg-as-a1p.sys.comcast.net
        kafka2:
          type: kafka
          environment:
            spring:
              kafka:
                host: caapmsg-as-a3p.sys.comcast.net
      default-binder: kafka2                    
      kafka:
        binder:
          zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net

我注意到,如果我包含下面的配置,错误就会消失,但是现在我有一个无限的消息循环被消费和发送。
代码段:

kafka:
        binder:
           brokers: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
          zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net

我需要做什么来阻止这个无限循环?
嗨,马吕斯,谢谢你回复求救电话。我对上述问题有一个改进。现在,流将从a1p消耗(topic:test),如果消息有效,则转发给a3p(主题:test),否则将错误消息发送给a1p(topic:errormsgqueue). 我有下面的申请表。yml文件
springemoji nature:cloudstream:bindings:errormsgqueue:binder:kafka1目的地:errormsgqueue contenttype:application/json输入:binder:kafka2内容类型:application/x-java-object;type=com.comcast.activation.message.vo.activationdatainfo目的地:测试组:prac
activationmsgqueue:binder:kafka3 destination:test contenttype:application/json binders:kafka1:type:kafka environment:springemoji nature:cloudstream:kafka:binder:brokers:caapmsg-as-a1p.sys.comcast.net zk nodes:caapmsg-as-a1p.sys.comcast.net kafka2:type:kafka environment:springemoji nature:cloudstream:kafka:binder:brokers:caapmsg-as-a3p.sys.comcast.net zk节点:caapmsg-as-a3p.sys.comcast.net kafka3:类型:kafka环境:spring:云:流:kafka:绑定器:代理:caapmsg-as-a1p.sys.comcast.net zk节点:caapmsg-as-a1p.sys.comcast.net默认绑定器:kafka2
我仍然得到一个无限循环。我做错什么了?

pgx2nnw8

pgx2nnw81#

spring.kafka.host 不是spring云流的有效配置选项。http://docs.spring.io/spring-cloud-stream/docs/1.0.0.release/reference/htmlsingle/index.html#_kafka_binder_properties 是粘合剂支持的唯一属性。
而且,应用程序似乎在混合这两个集群的配置(我假设它们是独立的簇?)
应该是这样的: spring: cloud: stream: bindings: activationMsgQueue: binder: kafka1 destination: test contentType: application/json consumer: resetOffsets: true startOffset: latest input: binder: kafka2 content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo destination: test group: prac binders: kafka1: type: kafka environment: spring: cloud: stream: kafka: binder: brokers: caapmsg-as-a1p.sys.comcast.net zk-nodes: caapmsg-as-a1p.sys.comcast.net kafka2: type: kafka environment: spring: cloud: stream: kafka: binder: brokers: caapmsg-as-a3p.sys.comcast.net zk-nodes: caapmsg-as-a3p.sys.comcast.net default-binder: kafka2 看这个例子https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/multibinder-differentsystems/src/main/resources/application.yml 详情。
我怀疑无限循环是由向同一主题发送和接收消息引起的。

相关问题