kafka生产者/使用者重新启动后,使用者不接收消息

t98cgbkg  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(549)

我们有一个生产者、一个消费者和一个分区。消费者/生产者都是spring引导应用程序。消费者应用程序在我的本地机器上运行,而生产者和kafka&zookeeper在远程机器上运行。
在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在那之后,我的消费者没有收到任何信息。我尝试重新启动消费者,但没有成功。问题是什么和/或如何解决?
使用者配置:

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        input:
          destination: sales
          content-type: application/json
      kafka:
        binder:
          brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          defaultZkPort: 2181
          defaultBrokerPort: 9092
server:
  port: 0

生产者配置:

cloud:
stream:
  defaultBinder: kafka
  bindings:
    output:
      destination: sales
      content-type: application/json
  kafka:
    binder:
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      defaultZkPort: 2181
      defaultBrokerPort: 9092

编辑2:
5分钟后,消费者应用程序死亡,但以下情况除外:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255  WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256  INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel
ar7v8xwq

ar7v8xwq1#

嗯,看起来已经有一个错误报告了 spring-cloud-stream-binder-kafka 陈述 resetOffset 属性无效。因此,消费者总是请求偏移量为的消息 latest .
正如在git问题上提到的,唯一的解决方法是通过kafka consumer cli工具来解决这个问题。

ca1c2owp

ca1c2owp2#

看看上面关于调试的建议是否揭示了任何进一步的信息。看起来您从kafkatopicprovisioner收到了一些超时异常。但是当你重新启动消费者的时候就会发生这种情况。似乎消费者在与经纪人沟通时遇到了一些问题,你需要了解那里发生了什么。

相关问题