kafka——从cli和kafka java api开始

t9eec4r0  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(373)

最近,在使用kafka时,我的应用程序需要从一开始就访问一个主题中的所有消息。因此,在编写kafka消费者(使用javaapi)时,我能够从头开始读取消息,但它只返回主题中的前500条消息。试图增加
props.put(consumerconfig.max\u poll\u records\u config,integer.max\u value);props.put(consumerconfig.fetch\u max\u bytes\u config,long.max\u value);
但它仍然不会返回所有消息,而在使用cli命令时,
kafka控制台使用者--引导服务器localhost:9092 --topic --从一开始
它会返回我所有的5000条记录。
伙计们,有什么配置丢失了吗?任何帮助都是值得感激的。。

[编辑:1]

消费者代码。

public ConsumerRecords<byte[], byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
    consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
    consumer.poll(0);
    // Reading topic offset from beginning
    consumer.seekToBeginning(consumer.assignment());
    // poll and time-out if no replies
    ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
    consumer.close();
    return records;
}

然而,我改变了消费者:

public Map<String, byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
    Map<String, byte[]> entityMap = new HashMap<String, byte[]>();
    boolean stop = false;
    consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
    consumer.poll(0);
    // Reading topic offset from beginning
    consumer.seekToBeginning(consumer.assignment());
    while (!stop) {
        // Request unread messages from the topic.
        ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<byte[], byte[]>> iterator = consumerRecords.iterator();
        if (iterator.hasNext()) {
            while (iterator.hasNext()) {
                ConsumerRecord<byte[], byte[]> record = iterator.next();
                // Iterate through returned records, extract the value
                // of each message, and print the value to standard output.
                entityMap.put(new String(record.key()), record.value());
            }
        } else {
            stop = true;
        }
    }
    return entityMap;
}

虽然现在它正在获取所有记录,但我想知道是否有更好的方法。

yzuktlbb

yzuktlbb1#

使用没有错 seekToBeginning() 使用所有消息。
然而,有一种更灵活的方法可以达到同样的效果。您可以通过配置来实现这一点,这允许您从开始到结束都使用相同的代码。这也是方法 kafka-console-consumer.sh 工具用途:
auto.offset.resetearliestgroup.id 新的/随机值。如果你不想跟踪这个消费者的立场,但总是想从一开始,你也可以设置 enable.auto.commit 为避免污染主题,请输入false。
删除 seekToBeginning() 根据你的逻辑
关于你的逻辑,你应该考虑以下几点:
有些情况下 poll() 可以返回空集合,即使它尚未到达结尾。还有一个主题是流(无界),结尾可以移动。不管怎样你都可以用 endOffsets() 查找当前结束偏移量并将其与返回的消息的偏移量进行比较
你可能不想投票,直到你到达终点。一个主题的大小可以是几GB,并且包含数百万条记录。将所有内容存储在Map中很容易导致内存不足问题。

相关问题