在Spring的Kafka,哪一种方法会像《离别》一样被遗忘?

pobjuy32  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(333)

我知道在斯宾格·Kafka中我们有以下方法:
无效寄存器seekCallback(consumerseekcallback);
void onpartitionsassigned(Map分配、ConsumerSekCallback);
void onidlecontainer(Map分配,consumerseekcallback);
但是哪个方法与partitionsRevoked上的本机ConsumerBalanceListener方法具有相同的功能?
“此方法将在重新平衡操作开始之前和使用者停止获取数据之后调用。建议在此回调中将偏移提交到kafka或自定义偏移存储,以防止重复数据。”
如果要实现ConsumerBalanceListener,如何传递kafkaconsumer引用?我只看到消费者从SpringKafka。
==========更新======
嗨,加里,当我把这个添加到容器属性时。我可以看到这两种方法都会触发。但是,我得到了一个例外,说“commit”之类的东西无法完成,因为组已经重新平衡并将分区分配给了另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间来处理消息“您知道吗?
=============更新2============

public ConcurrentMessageListenerContainer<Integer, String> createContainer(
      ContainerProperties containerProps, IKafkaConsumer iKafkaConsumer) {

    Map<String, Object> props = consumerProps();

    DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);

  **RebalanceListner rebalanceListner = new RebalanceListner(cf.createConsumer());**

    CustomKafkaMessageListener ckml = new CustomKafkaMessageListener(iKafkaConsumer, rebalanceListner);

    CustomRecordFilter cff = new CustomRecordFilter();

    FilteringAcknowledgingMessageListenerAdapter faml = new FilteringAcknowledgingMessageListenerAdapter(ckml, cff, true);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(5);

    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(1500); // 1.5 seconds

    RetryTemplate rt = new RetryTemplate();
    rt.setBackOffPolicy(backOffPolicy);
    rt.setRetryPolicy(retryPolicy);
    rt.registerListener(ckml);
    RetryingAcknowledgingMessageListenerAdapter rml = new RetryingAcknowledgingMessageListenerAdapter(faml, rt);

    containerProps.setConsumerRebalanceListener(rebalanceListner);
    containerProps.setMessageListener(rml);
    containerProps.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    containerProps.setErrorHandler(ckml);
    containerProps.setAckOnError(false);
    ConcurrentMessageListenerContainer<Integer, String> container = new ConcurrentMessageListenerContainer<>(
        cf, containerProps);

    container.setConcurrency(1);
    return container;
  }
fwzugrvs

fwzugrvs1#

您可以添加 RebalanceListener 到集装箱的 ContainerProperties 传递给构造函数的。

相关问题