我目前正在实现一个微服务,它从apachekafka主题中读取数据。我对微服务使用“spring-boot,version:1.5.6.release”,对同一微服务中的侦听器使用“spring-kafka,version:1.2.2.release”。这是我的Kafka配置:
@Bean
public Map<String, Object> consumerConfigs() {
return new HashMap<String, Object>() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
}};
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
我已经通过 @KafkaListener
注解:
@KafkaListener(topics = "${kafka.dataSampleTopic}")
public void receive(ConsumerRecord<String, String> payload) {
//business logic
latch.countDown();
}
当侦听器与apachekafka服务器失去连接时,我需要能够关闭micro服务。
当我杀死kafka服务器时,我在spring引导日志中得到以下消息:
2017-11-01 19:58:15.721 INFO 16800 --- [ 0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) dead for group TestGroup
当我开始Kafka萨弗,我得到:
2017-11-01 20:01:37.748 INFO 16800 --- [ 0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator 192.168.0.4:9092 (id: 2145482646 rack: null) for group TestGroup.
很明显,我的微服务中的SpringKafka侦听器能够检测kafka服务器何时启动并运行,何时不运行。在书中由Kafka合流的权威性指南一章 But How Do We Exit?
据说 wakeup()
方法需要在使用者上调用,以便 WakeupException
会被扔掉。所以我尝试用 @EventListener
标记,如spring for apache kafka文档中所述,然后调用 wakeup()
. 但是文档中的示例是关于如何检测空闲消费者的,这不是我的情况。有人能帮我一下吗。提前谢谢。
1条答案
按热度按时间lyr7nygr1#
我不知道如何获得服务器关闭状态的通知(根据我的经验,消费者在服务器中进入了一个紧密的循环)
poll()
).但是,如果您发现了这一点,您可以停止侦听器容器,它将唤醒使用者并退出紧密循环。。。
2017-11-01 16:29:54.290信息21217---[ad | so47062346]o.a.k.c.c.internals.abstractcoordinator:标记协调员localhost:9092 (id:2147483647机架:空)组so47062346的死
2017-11-01 16:29:54.346 warn 21217---[ntainer#0-0-c-1]org.apache.kafka.clients.networkclient:无法建立到节点0的连接。代理可能不可用。
...
2017-11-01 16:30:00.643 warn 21217---[ntainer#0-0-c-1]org.apache.kafka.clients.networkclient:无法建立到节点0的连接。代理可能不可用。
2017-11-01 16:30:00.680 info 21217---[ntainer#0-0-c-1]essagelistenercontainer$listenerconsumer:消费者已停止
您可以通过添加
reconnect.backoff.ms
,但是poll()
从不退出,因此我们不能发出空闲事件。我想您可以启用空闲事件并使用计时器来检测一段时间内是否没有收到数据(或空闲事件),然后停止容器。