对多消息使用者使用手动提交

piok6c0g  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(393)

我对Kafka很陌生。使用SpringBootKafka,我使用一个消息对象和手动确认开发了一个发布者和一个使用者。我的代码使用spring注解。这很好用。现在,当我连接到生产代理时,这个代理不会发送一条消息,而是一个消息列表。我的侦听器方法具有以下签名:

@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload Message message, Acknowledgment ack)

所以我可以确认每一条信息。很好。但现在看来我必须用

@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload List<Message> messages, Acknowledgment ack)

即使遵循文档,我似乎也应该使用

@KafkaListener (topics="MessagesTopic", containerFactory="messageContainerfactory")
public void listen(@Payload List<Message> messages, Acknowledgment ack, Consumer<?,?> consumer)

我应该将batchmode设置为true吗?
现在的问题是:当这条消息被完全处理后,我如何确认每条消息?
非常感谢你的帮助

ma8fv8wu

ma8fv8wu1#

如果您确实想手动提交偏移量,那么类似的方法可以帮助您。
如果你不想要它,那就换 setAckMode 其他价值。
这件事是按Spring的方式做的。 CoreAutoConfiguration 班级:

@Configuration
@Import({KafkaAutoConfiguration.class})
public class CoreAutoConfiguration {

@Bean("batchKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> batchKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setBatchListener(true);
    return factory;
    }
}

然后你就走了 Config 班级:

@Configuration
@Import({
        CoreAutoConfiguration.class,
        KafkaAutoConfiguration.class,
})
@EnableKafka
@EnableRetry
public class Config {
}

最后,消费者:

@KafkaListener(
        topics = "MessagesTopic",
        containerFactory = "batchKafkaListenerContainerFactory"
)
public void dataReceived(@Payload List<String> payload) throws RuntimeException {
    yourService.processIncomingData(payload);
}

最后,属性:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=helloworld
spring.kafka.listener.type=batch
spring.kafka.consumer.enable-auto-commit=false

# this is size of incoming list if broker has this many entries, can be lower eventually

spring.kafka.consumer.max-poll-records=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

相关问题