我正在尝试实现一个基于spring-boot的kafka消费者,它有一些非常强大的消息传递保证,即使在出现错误的情况下也是如此。
必须按顺序处理来自分区的消息,
如果消息处理失败,应该挂起特定分区的使用,
应使用回退重试处理,直到成功为止。
我们目前的实施满足以下要求:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
final ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
containerProperties.setErrorHandler(errorHandler());
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(1.5);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
public ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}
然而,在这里,记录被消费者永远锁定。在某个时刻,处理时间将超过 max.poll.interval.ms
服务器会将分区重新分配给其他使用者,从而创建一个副本。
假设 max.poll.interval.ms
等于5分钟(默认值),故障持续30分钟,这将导致消息处理约6次。
另一种可能性是在n次重试(例如3次尝试)之后,通过使用 SimpleRetryPolicy
. 然后,消息将被重播(感谢 SeekToCurrentErrorHandler
)处理将从零开始,最多5次尝试。这导致延迟形成一个系列,例如。
10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...
这比一个不断上升的目标更不理想:)
在前面的例子中,有没有第三种情况可以使延迟形成一个升序序列,同时不产生重复的情况?
1条答案
按热度按时间23c0lvtd1#
它可以通过有状态重试来完成—在这种情况下,每次重试后都会抛出异常,但状态是在retry state对象中维护的,因此该消息的下一次传递将使用下一个延迟等。
这需要消息中的某些内容(例如标头)来唯一地标识每条消息。幸运的是,对于kafka,主题、分区和偏移量为状态提供了唯一的密钥。
然而,目前
RetryingMessageListenerAdapter
不支持有状态重试。您可以在侦听器容器工厂中禁用retry,并使用有状态的
RetryTemplate
在侦听器中,使用execute
采取的方法RetryState
争论。可以随意为框架添加github问题,以支持有状态重试;欢迎投稿已发出拉取请求。
编辑
我刚刚编写了一个测试用例来演示如何使用
@KafkaListener
...和
每次尝试交货后。
在本例中,我添加了一个恢复程序来在重试结束后处理消息。您可以执行其他操作,例如停止容器(但是在单独的线程上执行此操作,就像我们在
ContainerStoppingErrorHandler
).