spring启动kafka侦听器不一致

aiazj4mn  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(394)

我正在尝试让几个不同的spring云微服务都连接到kafka/zookeeper集群,都在kubernetes中。微服务正在使用 org.springframework.kafka:spring-kafka -作为事件的消费者和生产者。
所有的服务连接到Kafka好-和主题创建;然而,每项服务的消费者都非常不一致。
例如,当服务启动一次时,所有使用者都将侦听消息并调用函数。但是,当我重新启动所有东西(包括kafka和zookeeper)时,它要么就是不工作,要么不同服务中的一些消费者会工作,等等。。。
以下是我在application.yml中的一些配置-我没有任何基于java的配置-如下所示:

spring:

  ....

  kafka:
    consumer:
      bootstrap-servers: api-kafka.default.svc.cluster.local:9092
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: api-event
      enable-auto-commit: false

    producer:
      bootstrap-servers: api-kafka.default.svc.cluster.local:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      ack-mode: manual

...

我的主课是:

@EnableCaching
@SpringBootApplication
@EnableJpaRepositories
@EnableDiscoveryClient
@EnableKafka /* <<<<<<<------------- ENABLED HERE */
public class ExampleServiceApplication {

  public static void main(String[] args) {
    SpringApplication.run(ExampleServiceApplication.class, args);
  }

  .....
}

最后,我的消费者:

@Component
public class MessageListener {

  @KafkaListener(
      topics = "myTopic")
  public void eventListener(String serializedMessage) {
    try {
....

消息可以很好地发送到代理,但不会被其他服务使用。
我意识到没有Map到每个服务属性上的主题,如何通过application.yml实现这一点?
我打赌我犯了一个很大的错误,但是是的!我真的很感激任何意见或帮助

wsewodh2

wsewodh21#

顺便说一句,您可以在这里阅读更多关于分区数量和并行使用者(具有相同组id的使用者)数量之间的关系的信息。
https://docs.confluent.io/platform/current/streams/architecture.html
稍微简化一下,应用程序可以运行的最大并行度受流任务的最大数量的限制,流任务本身由应用程序从中读取的输入主题的最大分区数量决定。例如,如果输入主题有5个分区,那么最多可以运行5个应用程序示例。这些示例将协同处理主题的数据。如果运行的应用程序示例数大于输入主题的分区数,“多余”的应用程序示例将启动,但仍处于空闲状态;但是,如果其中一个繁忙示例停止工作,则其中一个空闲示例将恢复前者的工作。我们在faq中提供了更详细的解释和示例。

相关问题