消费者内存过载

dhxwm5r4  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(363)

我正在计划一个spring+kafka流应用程序,它处理传入的消息并存储这些消息产生的更新的内部状态。预计每个唯一密钥的状态将达到~500mb(可能有~10k个唯一密钥分布在2k个分区中)。
这种状态通常必须保存在内存中才能有效地运行我的应用程序,但即使在磁盘上,我仍然会面临类似的问题(尽管只是在以后的扩展日期)。
我计划将此应用程序部署到一个动态扩展环境(如aws)中,并将设置最少的示例数,但我对以下两种情况持谨慎态度:
在第一次启动时(可能只有一个使用者首先启动),它将无法处理所有分区的分配,因为内存中的状态将使示例的可用内存溢出。
在一次大的淘汰(aws可用性区域淘汰)之后,可能会有33%的用户被淘汰出组,而剩余示例上的额外内存负载实际上可能会淘汰所有剩余的用户。
人们如何保护他们的消费者不占用超过他们所能处理的分区,这样他们就不会溢出可用的内存/磁盘?

bxjv4tth

bxjv4tth1#

参见Kafka文档。
从0.11开始。。。

编辑
对于您的第二个用例(它也适用于第一个用例),也许您可以实现一个定制的 PartitionAssignor 这限制了分配给每个示例的分区数。
我没有试过;我不知道代理对未分配分区的存在有何React。
编辑2
这似乎工作正常;但是ymmv。。。

public class NoMoreThanFiveAssignor extends RoundRobinAssignor {

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
            Map<String, Subscription> subscriptions) {

        Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);
        assignments.forEach((memberId, assigned) -> {
            if (assigned.size() > 5) {
                System.out.println("Reducing assignments from " + assigned.size() + " to 5 for " + memberId);
                assignments.put(memberId, 
                        assigned.stream()
                            .limit(5)
                            .collect(Collectors.toList()));
            }
        });
        return assignments;
    }

}

@SpringBootApplication
public class So54072362Application {

    public static void main(String[] args) {
        SpringApplication.run(So54072362Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so54072362", 15, (short) 1);
    }

    @KafkaListener(id = "so54072362", topics = "so54072362")
    public void listen(ConsumerRecord<?, ?> record) {
        System.out.println(record);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            for (int i = 0; i < 15; i++) {
                template.send("so54072362", i, "foo", "bar");
            }
        };
    }

}

spring.kafka.consumer.properties.partition.assignment.strategy=com.example.NoMoreThanFiveAssignor
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest

Reducing assignments from 15 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
2019-01-07 15:24:28.288  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 7
2019-01-07 15:24:28.289  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:28.296  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.303  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.304  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
Reducing assignments from 8 to 5 for consumer-2-c9a6928a-520c-4646-9dd9-4da14636744b
Reducing assignments from 7 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
2019-01-07 15:24:46.310  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 8
2019-01-07 15:24:46.311  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:46.315  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
2019-01-07 15:24:58.330  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 9
2019-01-07 15:24:58.332  INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]
2019-01-07 15:24:58.336  INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]

当然,这会使未分配的分区悬而未决,但听起来这就是您想要的,直到该区域重新联机。

相关问题