java—多次阅读Kafka的同一条信息

4szc88ey  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(328)

我使用spring kafka api实现kafka consumer和手动偏移管理:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

这里,我希望消费者只在 someCondition 持有。否则,消费者应该睡眠一段时间,然后再次阅读相同的消息。
Kafka配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

使用当前配置,如果 someCondition == false ,使用者不提交偏移量,但仍读取下一条消息。有没有办法让消费者重读Kafka acknowledgement 没有表演?

pb3s4cty

pb3s4cty1#

您可以停止并重新启动容器,它将被重新发送。
在即将发布的1.1版本中,您可以寻找所需的偏移量,然后它将被重新发送。
但是如果以后的消息已经被检索到,您仍然会首先看到它们,因此您也必须丢弃它们。
第二个里程碑有这个功能,我们预计它将在下周发布。

vvppvyoh

vvppvyoh2#

正如@gary已经指出的,你的方向是正确的, seek() 就是这样做的。今天,当我面对这个问题时,我找不到它的代码示例。这是给任何想解决这个问题的人的代码。

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {

    private ConsumerSeekCallback consumerSeekCallback;

    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {

        if (/*some condition*/) {
            //process
            acknowledgment.acknowledge(); //send ack
        } else {

            consumerSeekCallback.seek("your.topic", record.partition(), record.offset());

        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

}
dojqjjoe

dojqjjoe3#

你可以试着用 nack(long sleep) 其中唯一的参数表示 sleep interval ms 实现上述行为。
来自spring for apache kafka文档:
从版本2.3开始,确认接口有两个附加方法nack(long sleep)和nack(int index,long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为侦听器类型调用错误的方法将引发illegalstateexception。
将上述信息应用到代码示例中,我们得到:

@Component
@Slf4j
public class ExampleConsumer {
    private boolean nonError = false;

    @KafkaListener(topics = "topic_name")
    private void consumeSelectingMsgFromMailbox(ConsumerRecord<String, KafkaEventPojo> record, Acknowledgment ack) {
        log.info("Received record topic:{} partition:{} offset:{}", record.topic(), record.partition(), record.offset());

        if (nonError) {
            log.info("ACK: {}", offset);
            ack.acknowledge(); //send ack
            if (offset % 2 == 0)
                nonError = false;
        } else {
            ack.nack(0); // immediate seek - no sleep time for consumer
            nonError = true;
        }
    }
}

配置如下:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    private ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> factory;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // ...
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // ...

        return props;
    }

    @Bean
    public ConsumerFactory<String, KafkaEventPojo> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEventPojo> kafkaListenerContainerFactory() {
        if (this.factory == null) {
            this.factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        }

        return this.factory;
    }

该示例生成:

2020-07-31 17:05:19.275  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.792  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:15
2020-07-31 17:05:19.793  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 15
2020-07-31 17:05:19.805  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:16
2020-07-31 17:05:19.805  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 16
2020-07-31 17:05:19.810  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:17
2020-07-31 17:05:20.313  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 17
2020-07-31 17:05:20.318  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:18
2020-07-31 17:05:20.318  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 18
2020-07-31 17:05:20.322  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.827  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox Received record topic:event_log_selectingMsgFromMailbox partition:1 offset:19
2020-07-31 17:05:20.828  INFO       17560 --- [ntainer#0-0-C-1] .d.g.a.a.k.c.InboxRetrievalEventConsumer : consumeSelectingMsgFromMailbox ACK: 19

注: KafkaEventPojo 是我对pojo的实现,它按照我们的内部结构保存存储在kafka中的记录数据,所以您可以根据需要进行任何更改。另外,上面的代码演示了nack在单记录侦听器中的用法。如果您需要批处理选项,您可以在提供的文档中找到如何这样做的示例。

相关问题