我有一个应用程序正在使用kafka在示例之间同步数据,因此它同时生成和使用来自kafka的数据,此外,该应用程序正在使用一个kafka主题,并将该数据转换为另一个主题并流式传输到另一个主题中供客户端使用。
我的应用程序有两个用于故障转移的集群。翻阅Kafka的文件我发现了这个https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting 关于 ABSwitchCluster
.
如果kafka集群同时宕机,如何使用abswitchcluster自动进行故障切换 KafkaTemplate.send()
以及 @KafkaListener
带注解的方法?
更新更多信息
我为kafkatemplate.send和kafka consumer事件添加了一些错误处理程序 NonResponsiveConsumerEvent
以及 ListenerContainerIdleEvent
最终,他们调用一个共享方法来切换,并 BeanPostProcessor
用于实际添加 ABSwitchCluster
至 KafkaResourceFactory
豆。
切换代码如下所示:
@Autowired
KafkaSwitchCluster kafkaSwitchCluster;
@Autowired
WebApplicationContext context;
@Autowired
KafkaListenerEndpointRegistry registry;
/**
* Unable to use {@link Autowired} due to circular dependency
* with {@link KafkaPostProcessor}
* @return
*/
public DefaultKafkaProducerFactory getDefaultKafkaProducerFactory()
{ return context.getBean(DefaultKafkaProducerFactory.class); }
/**Back-End Method to Actually Switch between the clusters */
private void switchCluster()
{
if (kafkaSwitchCluster.isPrimary()) { kafkaSwitchCluster.secondary(); }
else { kafkaSwitchCluster.primary(); }
getDefaultKafkaProducerFactory().reset();
registry.stop();
registry.destroy();
registry.start();
for(MessageListenerContainer listener : registry.getListenerContainers() )
{
listener.stop();
listener.start();
}
}
在查看测试日志时,根据上面的更新,生产者似乎是正确的,正在切换集群,但我的消费者不是。
那我怎么才能拿到 @KafkaListener
消费者要换吗?
1条答案
按热度按时间nr9pn0ug1#
默认的生产者和消费者工厂以及
KafkaAdmin
是一类KafkaResourceFactory
.你通过考试
ABSwitchCluster
打电话进来setBootstrapServersSupplier()
.这个
ABSwitchCluster
不会自动故障转移。您需要自己的代码来执行故障转移,然后调用
reset()
在producer工厂上,停止/启动所有侦听器容器(KafkaListenerEndpointRegistry.stop()/start()
对所有人@KafkaListener
s。