springkafka不支持大型消息消费者

ff29svar  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(332)

我正在使用SpringKafka来使用linkedin生成的消息—支持kafka客户端的大型消息
鉴于Kafka的客户总是 AUTO_OFFSET_RESET_CONFIG 在其构造函数中显示为“无”。

private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
    Deserializer<K> keyDeserializer,
    Deserializer<V> valueDeserializer,
    Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
    Auditor<K, V> consumerAuditor) {
        _kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
        byteArrayDeserializer,
        byteArrayDeserializer);
    }
Map<String, Object> configForVanillaConsumer() {
    Map<String, Object> newConfigs = new HashMap<>();
    newConfigs.putAll(this.originals());
    newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
    return newConfigs;
}

所以一旦我开始使用批提交并设置 ENABLE_AUTO_COMMIT_CONFIG 如果为false,则抛出以下错误:
[org.springframework.kafka.kafkalistenerendpointcontainer#0-1-c-1]错误o.a.k.c.c.i.consumercoordinator-用户为组文档事件使用者提供的侦听器com.linkedin.kafka.clients.consumer.likafKaConsumerBalanceListener在分区分配org.apache.kafka.clients.consumer.nooffsetforpartitionexception上失败:未定义的偏移量没有分区的重置策略:documentevents-2位于org.apache.kafka.clients.consumer.internals.fetcher.resetoffset(fetcher)。java:369)在org.apache.kafka.clients.consumer.internals.fetcher.updatefetchpositions(fetcher。java:247)在org.apache.kafka.clients.consumer.kafkaconsumer.updatefetchpositions(kafkaconsumer。java:1602)在org.apache.kafka.clients.consumer.kafkaconsumer.position(kafkaconsumer。java:1265)在com.linkedin.kafka.clients.consumer.likafcummerimpl.position(likafcummerimpl。java:403)在org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer$1.onpartitionsassigned(kafkamessagelistenercontainer。java:447)在com.linkedin.kafka.clients.consumer.likafkaConsumerBalanceListener.onpartitionsassigned(likafkaConsumerBalanceListener)。java:62)在org.apache.kafka.clients.consumer.internals.consumercoordinator.onjoincomplete(consumercoordinator。java:255)在org.apache.kafka.clients.consumer.internals.abstractcoordinator.joingroupifneeded(abstractcoordinator)。java:339)在org.apache.kafka.clients.consumer.internals.abstractcoordinator.ensureReactiveGroup(abstractcoordinator.com)上。java:303)在org.apache.kafka.clients.consumer.internals.consumercoordinator.poll(consumercoordinator。java:286)在org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer。java:1030)在org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer。java:995)在com.linkedin.kafka.clients.consumer.likafcummerimpl.poll(likafcummerimpl。java:231)在org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.run(kafkamessagelistenercontainer。java:558)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.lang.thread.run(线程。java:745)
发生此问题是因为此使用者组是第一次使用来自此主题的消息,因此它尝试使用偏移量重置策略。
虽然我将其设置为“最早”,但它被底层linkedin kafka客户端覆盖为“无”
在本例中,我还尝试重写ConsumerBalanceListener以手动查找到开头,但实际上没有达到这一点。
如何解决这个问题?

wqlqzqxt

wqlqzqxt1#

有趣的;请在github中打开一个问题。
如果政策是正确的,我们应该抓住这个例外 none .
同时,您可以只使用一次常规客户机来解决这个问题,为组实际设置一个初始偏移量(您实际上不必接收任何消息,只需分配分区并为组设置初始位置)。

相关问题