我要验证kafka中创建的事件(使用java)有三个分区,这些分区是由移动应用程序生成的。我有以下消费属性:
Properties props = new Properties();
props.put("bootstrap.servers", "e1.com:6767,e4.com:6767");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "true");
props.put("group.id", "QAAutomation_" + com.ltree.core.Utility.generateRandomString(8));
props.put("auto.offset.reset", "latest");
props.put("security.protocol", "SASL_SSL");
props.put("max.poll.records","2000");
props.put("max.partition.fetch.bytes","10485760");
System.setProperty("java.security.auth.login.config", "./src/test/resources/kafka-jaas.conf");
System.setProperty("java.security.krb5.config","DS1.TEST");
System.setProperty("java.security.krb5.kdc","DS1.TEST");
System.setProperty("java.security.krb5.realm","DS1.TEST");
return new KafkaConsumer<>(props);
目的是验证移动应用程序是否在kafka中生成了特定事件。我的逻辑是从每个分区获取2000个最新事件,然后遍历每个记录,查看record.value()是否包含searchstring。
由于来自生产商(移动应用程序)的大量EVEN数据,有时预期的事件可能不在前2000年,可能是第一次迭代当前的第3500个事件(只是说),而不是最新的当前事件。我遇到的问题是:
Iteration 1: Partition 0: offset 9500 to 7500
Iteration 1: Partition 1: offset 12500 to 10500
Iteration 1: Partition 2: offset 10500 to 8500
Iteration 2: Partition 0: offset 11500 to 9500 <- Here I want to read from 7499,
where previous iteration left off. How to do this?
我曾经
current = consumer.position(topicPartition);
consumer.seek(topicPartition, current-2000);
这会将起始位置从6000移动到4000,从而给出记录。我错过了从6000到4000的记录。
组id是一个随机生成器,是的,有些可能已经存在id。每次运行它都会创建一个新的组id。
有什么建议吗?
1条答案
按热度按时间d8tt03nd1#
因此,在使用它之后,我使用了上面相同的属性,但是在查找消费者记录时,我使用了下面的代码:这不是确切的解决方案,但它适用于我的情况。每次从分区收集最新的3000条记录。