Spring的云流,Kafka活页夹-按需寻找

vsaztqbk  于 2023-01-04  发布在  Spring
关注(0)|答案(1)|浏览(142)

我用Spring的云流搭配Kafka,我有一个主题X,有分区Y,有消费群Z,Spring开机启动器父2.7.2,SpringKafka版本2.8.8

@StreamListener("input-channel-name")
public void processMessage(final DomainObject domainObject) {
    // some processing
}

它工作正常。
我希望在应用中有一个端点,允许我重新读取/重新处理seek对吗?)X.Y中的所有消息(再次)。但不是在重新平衡(ConsumerSeekAware#onPartitionsAssigned)或应用重新启动(KafkaConsumerProperties#resetOffsets)后,而是按需如下:

@RestController
@Slf4j
@RequiredArgsConstructor
public class SeekController {

    @GetMapping
    public void seekToBeginningForDomainObject() {

        /**
         * seekToBeginning for X, Y, input-channel-name
         */
    }
}

我就是做不到。这有可能吗?我知道我必须在消费者层面上做到这一点,可能是在订阅@StreamListener("input-channel-name")之后创建的。对吗?但是我不知道如何获得这个消费者。我如何执行按需搜索,使Kafka再次向消费者发送消息?我只想将X.Y.Z的偏移量重置为0,以便再次创建应用程序、加载和处理所有消息。

zujrkrfu

zujrkrfu1#

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#rebalance-listener
KafkaBindingRebalanceListener.onPartitionsAssigned()提供了一个布尔值来指示这是初始分配还是重新平衡分配。
Spring cloud stream目前不支持运行时的任意查找,即使底层的KafkaMessageDrivenChannelAdapter支持访问ConsumerSeekCallback(允许轮询之间的任意查找),它需要对绑定器进行增强以允许访问此代码。
不过,可以在事件侦听器中使用空闲容器事件;该事件包含消费者,因此您可以在这些条件下执行任意查找。

相关问题