kafka-并非所有使用者都收到订阅的消息

mwg9r5ms  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(507)

为了使用kafka发布消息,我使用类名作为主题:

kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));

消费者订阅他们感兴趣的课程:

for(Object sub:_subscriptions)
        topics.add(sub.getClass().getName());
    _kafkaConsumer.subscribe(topics);

问题是,只有一个消费者曾经收到订阅的消息。我的理解是kafka将为每个订户分配一个唯一的分区(如果可用)。我目前只有2个订阅服务器,我的kafka server.properties指定了4个分区。似乎所有使用者都在从同一分区读取数据。也许Kafka是一个糟糕的选择服务巴士由于这种明显的限制。任何帮助都将不胜感激!
Kafka消费地产:

properties.put("bootstrap.servers", _settings.getEndpoint());
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("enable.auto.commit", "false");
    properties.put("group.id", "TestGroup");
    properties.put("auto.offset.reset","earliest");

Kafka制片公司:

properties.put("bootstrap.servers",_settings.getEndpoint());
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

服务器属性(我唯一更改的是默认属性):

num.partitions=4

注意:我还尝试了以下用户设置:

properties.put("bootstrap.servers", _settings.getEndpoint());
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("auto.commit.interval.ms","1000");
    properties.put("enable.auto.commit", "true");
    properties.put("group.id", "testGroup");
    properties.put("auto.offset.reset","latest");
    properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
pes8fvy9

pes8fvy91#

Kafka默认使用rangeassignor作为分区分配策略,具有以下特点:
范围赋值器以每个主题为基础工作。对于每个主题,我们按数字顺序排列可用分区,按字典顺序排列使用者。然后,我们将分区数除以使用者总数,以确定分配给每个使用者的分区数。如果它不能平均分配,那么前几个消费者将有一个额外的分区。例如,假设有两个使用者c0和c1,两个主题t0和t1,每个主题有3个分区,从而得到分区t0p0、t0p1、t0p2、t1p0、t1p1和t1p2。赋值为:c0:[t0p0,t0p1,t1p0,t1p1]c1:[t0p2,t1p2]
如果您想为少量分区提供更均匀的分布,可以通过设置 partition.assignment.strategy

s8vozzvw

s8vozzvw2#

如果您的所有消费者都有相同的消费群体( group.id 属性),则组中只有一个使用者将接收消息。如果您希望所有的消费者都能收到消息,那么他们需要有不同的 group.id .
要检查哪些使用者绑定到主题的分区,可以使用以下命令

./bin/kafka-consumer-groups.sh --bootstrap-server yourhost:9092 --group testGroup --describe

相关问题