如何使用abswitchcluster在kafka集群之间进行故障切换

avwztpqn  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(696)

我有一个应用程序正在使用kafka在示例之间同步数据,因此它同时生成和使用来自kafka的数据,此外,该应用程序正在使用一个kafka主题,并将该数据转换为另一个主题并流式传输到另一个主题中供客户端使用。
我的应用程序有两个用于故障转移的集群。翻阅Kafka的文件我发现了这个https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting 关于 ABSwitchCluster .
如果kafka集群同时宕机,如何使用abswitchcluster自动进行故障切换 KafkaTemplate.send() 以及 @KafkaListener 带注解的方法?
更新更多信息
我为kafkatemplate.send和kafka consumer事件添加了一些错误处理程序 NonResponsiveConsumerEvent 以及 ListenerContainerIdleEvent 最终,他们调用一个共享方法来切换,并 BeanPostProcessor 用于实际添加 ABSwitchClusterKafkaResourceFactory 豆。
切换代码如下所示:

@Autowired
   KafkaSwitchCluster kafkaSwitchCluster;

   @Autowired
   WebApplicationContext context;

   @Autowired
   KafkaListenerEndpointRegistry registry;

   /**
    *  Unable to use {@link Autowired} due to circular dependency
    *  with {@link KafkaPostProcessor}
    *  @return
    */
   public DefaultKafkaProducerFactory getDefaultKafkaProducerFactory()
   { return context.getBean(DefaultKafkaProducerFactory.class); }

   /**Back-End Method to Actually Switch between the clusters */
   private void switchCluster()
   {
      if (kafkaSwitchCluster.isPrimary()) { kafkaSwitchCluster.secondary(); }
      else { kafkaSwitchCluster.primary(); }

      getDefaultKafkaProducerFactory().reset();

      registry.stop();
      registry.destroy();
      registry.start();

      for(MessageListenerContainer listener : registry.getListenerContainers() )
      {
         listener.stop();
         listener.start();
      }
   }

在查看测试日志时,根据上面的更新,生产者似乎是正确的,正在切换集群,但我的消费者不是。
那我怎么才能拿到 @KafkaListener 消费者要换吗?

nr9pn0ug

nr9pn0ug1#

默认的生产者和消费者工厂以及 KafkaAdmin 是一类 KafkaResourceFactory .
你通过考试 ABSwitchCluster 打电话进来 setBootstrapServersSupplier() .
这个 ABSwitchCluster 不会自动故障转移。
您需要自己的代码来执行故障转移,然后调用 reset() 在producer工厂上,停止/启动所有侦听器容器( KafkaListenerEndpointRegistry.stop()/start() 对所有人 @KafkaListener s。

相关问题