kafka–检查每个分区中的消息数

nfeuvbwi  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(597)

我实现了一个循环分区器,如下所示:

public class KafkaRoundRobinPartitioner implements Partitioner {

    private static final Logger log = Logger.getLogger(KafkaRoundRobinPartitioner.class);

    final AtomicInteger counter = new AtomicInteger(0);

    public KafkaRoundRobinPartitioner() {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int partitionsCount = partitions.size();

        int partitionId = counter.incrementAndGet() % partitionsCount;
        if (counter.get() > 65536) {
            counter.set(partitionId);
        }
        return partitionId;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

现在我想测试每个分区有相同数量的消息。例如,如果我有一个带有32个分区的主题,并且我向这个主题发送了32条消息,我希望每个分区正好有一条消息。
我想做如下事情:

KafkaPartitions allPartitions = new KafkaTopic("topic_name");
for (KafkaPartition partition : allPartitions) {
    int msgCount = partition.getMessagesCount();
    // do asserts
}

据我所知,kafkajavaapi并没有为我们提供这样的功能,但我可能在文档中丢失了一些东西。
有什么方法可以优雅地执行它吗?
我只找到了一个基本的解决方案。因为我使用的是多消费者模型,所以我可以为每个消费者执行以下操作:

consumer.assignment().size();

之后我可以做:

consumer.poll(100);

并检查每个消费者是否都有一条消息。在这种情况下,我不应该面对这样一种情况:一个消费者从它的分区中得到另一个消费者的消息,因为我有相同数量的消费者和分区,kafka应该以循环方式在消费者之间分配分区。

tvz2xvvm

tvz2xvvm1#

你可以用 seekToBeginning() 以及 seekToEnd() 计算每个分区的偏移量之差。

baubqpgj

baubqpgj2#

最后,我写了如下的东西。
我的Kafka消费者的工人有以下代码:

public void run() {
    while (keepProcessing) {
        try {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
            for (ConsumerRecord<byte[], byte[]> record : records) {
                // do processing
                consumer.commitSync();
            }
        } catch (Exception e) {
            logger.error("Couldn't process message", e);
        }
    }
}

在我的测试中,我决定检查每个消费者是否只执行了一次提交,这意味着消息分发是以循环方式进行的。测试代码:

public class KafkaIntegrationTest {

private int consumersAndPartitionsNumber;
private final CountDownLatch latch = new CountDownLatch(consumersAndPartitionsNumber);

@Test
public void testPartitions() throws Exception {
    consumersAndPartitionsNumber = Config.getConsumerThreadAmount(); // it's 5
    KafkaMessageQueue kafkaMessageQueue = new KafkaMessageQueue(); // just a class with Producer configuration
    String groupId = Config.getGroupId();
    List<KafkaConsumer<byte[], byte[]>> consumers = new ArrayList<>(consumersAndPartitionsNumber);

    for (int i = 0; i < consumersAndPartitionsNumber; i++) {
        consumers.add(spy(new KafkaConsumer<>(KafkaManager.createKafkaConsumerConfig(groupId))));
    }

    ExecutorService executor = Executors.newFixedThreadPool(consumersAndPartitionsNumber);
    for (KafkaConsumer<byte[], byte[]> consumer : consumers) {
        executor.submit(new TestKafkaWorker(consumer));
    }

    for (int i = 0; i < consumersAndPartitionsNumber; i++) {
        // send messages to topic
        kafkaMessageQueue.send(new PostMessage("pageid", "channel", "token", "POST", null, "{}"));
    }

    latch.await(60, TimeUnit.SECONDS);

    for (KafkaConsumer<byte[], byte[]> consumer : consumers) {
        verify(consumer).commitSync();
    }
}

class TestKafkaWorker implements Runnable {

    private final KafkaConsumer<byte[], byte[]> consumer;
    private boolean keepProcessing = true;

    TestKafkaWorker(KafkaConsumer<byte[], byte[]> consumer) {
        this.consumer = consumer;
        consumer.subscribe(Arrays.asList(Config.getTaskProcessingTopic()));
    }

    public void run() {
        while (keepProcessing) {
            try {
                ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    consumer.commitSync();
                    keepProcessing = false;
                    latch.countDown();
                }
            } catch (Exception e) {
            }
        }
    }
}
}

相关问题