spring云流kafka提交失败,因为组被重新平衡

hc8w905p  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(595)

我拿到了 CommitFailedException 对于一些耗时的spring云流应用程序。我知道要解决这个问题我需要设置 max.poll.records 以及 max.poll.interval.ms 以满足我对处理批处理所需时间的期望。但是,我不太清楚如何在SpringCloudStream中为消费者设置它。
例外情况:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:808) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:691) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416) at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1554) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1418) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:739) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.lang.Thread.run(Thread.java:748)

此外,我如何确保这种情况不会发生呢?或者,在这种异常情况下,如何注入某种回滚?原因是我正在做一些其他的外部工作,一旦完成了,我就会相应地发布输出消息。因此,如果消息在外部系统上完成工作后由于任何问题而无法发布,我必须将其还原(在kafka publish和其他外部系统上执行某种原子事务)。

v64noz0r

v64noz0r1#

将心跳间隔设置为小于会话超时的1/3。如果代理无法确定您的消费者是否还活着,它将在剩余消费者之间启动分区重新平衡。因此,您有一个心跳线程来通知代理,如果应用程序需要更长的时间来处理,那么使用者是活动的。在用户配置中更改这些:

heartbeat.interval.ms

session.timeout.ms

如果无法工作,请尝试增加会话超时。你必须摆弄这些价值观。

a0zr77ik

a0zr77ik2#

您可以在这里的活页夹级文档中设置任意kafka属性
spring.cloud.stream.kafka.binder.consumerproperties用户属性
任意kafka客户端消费者属性的键/值Map。除了支持已知的kafka消费属性,这里还允许未知的消费属性。这里的属性将取代在boot和上面的configuration属性中设置的任何属性。
默认值:空Map。
例如。 spring.cloud.stream.kafka.binder.consumerProperties.max.poll.records=10 或者在这里的绑定级文档。
spring.cloud.stream.kafka.bindings..consumer.configuration
使用包含通用kafka使用者属性的键/值对进行Map。除了具有kafka使用者属性外,还可以在此处传递其他配置属性。例如,应用程序需要的一些属性,如spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。
默认值:空Map。
例如。 spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=10 您可以通过添加 OffsetCommitCallback 到侦听器容器的 ContainerProperties 和设置 syncCommitsfalse . 要自定义容器及其属性,请添加 ListenerContainerCustomizer 应用程序的bean。
编辑
异步提交回调。。。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {

    public static void main(String[] args) {
        SpringApplication.run(So57970152Application.class, args);
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
        return (container, dest, group) -> {
            container.getContainerProperties().setAckMode(AckMode.RECORD);
            container.getContainerProperties().setSyncCommits(false);
            container.getContainerProperties().setCommitCallback((map, ex) -> {
                if (ex == null) {
                    System.out.println("Successful commit for " + map);
                }
                else {
                    System.out.println("Commit failed for " + map + ": " + ex.getMessage());
                }
            });
            container.getContainerProperties().setClientId("so57970152");
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("input", "foo".getBytes());
        };
    }

}

手动提交(同步)。。。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {

    public static void main(String[] args) {
        SpringApplication.run(So57970152Application.class, args);
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
        return (container, dest, group) -> {
            container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
            container.getContainerProperties().setClientId("so57970152");
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
        System.out.println(in);
        try {
            ack.acknowledge(); // MUST USE MANUAL_IMMEDIATE for this to work.
            System.out.println("Commit successful");
        }
        catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("input", "foo".getBytes());
        };
    }

}

相关问题