我用Spring的云流搭配Kafka,我有一个主题X
,有分区Y
,有消费群Z
,Spring开机启动器父2.7.2
,SpringKafka版本2.8.8
:
@StreamListener("input-channel-name")
public void processMessage(final DomainObject domainObject) {
// some processing
}
它工作正常。
我希望在应用中有一个端点,允许我重新读取/重新处理(seek
对吗?)X.Y
中的所有消息(再次)。但不是在重新平衡(ConsumerSeekAware#onPartitionsAssigned
)或应用重新启动(KafkaConsumerProperties#resetOffsets
)后,而是按需如下:
@RestController
@Slf4j
@RequiredArgsConstructor
public class SeekController {
@GetMapping
public void seekToBeginningForDomainObject() {
/**
* seekToBeginning for X, Y, input-channel-name
*/
}
}
我就是做不到。这有可能吗?我知道我必须在消费者层面上做到这一点,可能是在订阅@StreamListener("input-channel-name")
之后创建的。对吗?但是我不知道如何获得这个消费者。我如何执行按需搜索,使Kafka再次向消费者发送消息?我只想将X.Y.Z
的偏移量重置为0
,以便再次创建应用程序、加载和处理所有消息。
1条答案
按热度按时间zujrkrfu1#
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#rebalance-listener
KafkaBindingRebalanceListener.onPartitionsAssigned()
提供了一个布尔值来指示这是初始分配还是重新平衡分配。Spring cloud stream目前不支持运行时的任意查找,即使底层的
KafkaMessageDrivenChannelAdapter
支持访问ConsumerSeekCallback
(允许轮询之间的任意查找),它需要对绑定器进行增强以允许访问此代码。不过,可以在事件侦听器中使用空闲容器事件;该事件包含消费者,因此您可以在这些条件下执行任意查找。