kafka消费者-从每个分区读取2000条最新消息

vq8itlhq  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(284)

我要验证kafka中创建的事件(使用java)有三个分区,这些分区是由移动应用程序生成的。我有以下消费属性:

Properties props = new Properties();
        props.put("bootstrap.servers", "e1.com:6767,e4.com:6767");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "true");
        props.put("group.id", "QAAutomation_" + com.ltree.core.Utility.generateRandomString(8));
        props.put("auto.offset.reset", "latest");
        props.put("security.protocol", "SASL_SSL");
        props.put("max.poll.records","2000");
        props.put("max.partition.fetch.bytes","10485760");
        System.setProperty("java.security.auth.login.config", "./src/test/resources/kafka-jaas.conf");
        System.setProperty("java.security.krb5.config","DS1.TEST");
        System.setProperty("java.security.krb5.kdc","DS1.TEST");
        System.setProperty("java.security.krb5.realm","DS1.TEST");
        return new KafkaConsumer<>(props);

目的是验证移动应用程序是否在kafka中生成了特定事件。我的逻辑是从每个分区获取2000个最新事件,然后遍历每个记录,查看record.value()是否包含searchstring。
由于来自生产商(移动应用程序)的大量EVEN数据,有时预期的事件可能不在前2000年,可能是第一次迭代当前的第3500个事件(只是说),而不是最新的当前事件。我遇到的问题是:

Iteration 1: Partition 0: offset 9500 to 7500
Iteration 1: Partition 1: offset 12500 to 10500
Iteration 1: Partition 2: offset 10500 to 8500
Iteration 2: Partition 0: offset 11500 to 9500 <- Here I want to read from 7499, 
where previous iteration left off. How to do this?

我曾经

current = consumer.position(topicPartition);
consumer.seek(topicPartition, current-2000);

这会将起始位置从6000移动到4000,从而给出记录。我错过了从6000到4000的记录。
组id是一个随机生成器,是的,有些可能已经存在id。每次运行它都会创建一个新的组id。
有什么建议吗?

d8tt03nd

d8tt03nd1#

因此,在使用它之后,我使用了上面相同的属性,但是在查找消费者记录时,我使用了下面的代码:这不是确切的解决方案,但它适用于我的情况。每次从分区收集最新的3000条记录。

con.seekToEnd(Arrays.asList(new TopicPartition(topic, i)));
                long current = con.position(new TopicPartition(topic, i));
                LOG.info("Partition "+ i +": offset: "+current);
                con.seek(new TopicPartition(topic, i), current - 3000);
                counter=0;

i is the index of partition.

相关问题