我已将kafka提交策略设置为最新且缺少前几条消息。如果我在开始向输入主题发送消息之前先休眠20秒,那么一切都正常工作。我不确定问题是否出在消费者花很长时间重新平衡分区上。有没有办法在开始投票前知道消费者是否准备好了?
t40tm48m1#
您可以执行以下操作:我有一个测试,读取Kafka主题的数据。所以您不能在多线程环境中使用kafkaconsumer,但是您可以传递参数“atomicreference assignment”,在使用者线程中更新它,并在另一个线程中读取它。例如,在项目中截取工作代码进行测试:
private void readAvro(String readFromKafka, AtomicBoolean needStop, List<Event> events, String bootstrapServers, int readTimeout) { // print the topic name AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>(); new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start(); long startTime = System.currentTimeMillis(); long maxWaitingTime = 30_000; for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) { Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>()); System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: " + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(","))); if (assignments.size() > 0) { break; } try { Thread.sleep(1_000); } catch (InterruptedException e) { e.printStackTrace(); needStop.set(true); break; } } System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime)); } private void readAvro(String bootstrapServers, String readFromKafka, AtomicBoolean needStop, List<Event> events, int readTimeout, AtomicReference<Set<TopicPartition>> assignment) { KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest"); System.out.println("Subscribed to topic: " + readFromKafka); consumer.subscribe(Collections.singletonList(readFromKafka)); long started = System.currentTimeMillis(); while (!needStop.get()) { assignment.set(consumer.assignment()); ConsumerRecords<String, byte[]> records = consumer.poll(1_000); events.addAll(CommonUtils4Tst.readEvents(records)); if (readTimeout == -1) { if (events.size() > 0) { break; } } else if (System.currentTimeMillis() - started > readTimeout) { break; } } needStop.set(true); synchronized (MainTest.class) { MainTest.class.notifyAll(); } consumer.close(); }
附笔needstop-全局标志,在成功失败的情况下停止所有正在运行的线程(如果有的话)事件-我要检查的对象的列表readtimeout—在读取所有数据之前等待的时间,如果readtimeout==-1,则在读取任何数据时停止
6qqygrtg2#
如果您的策略设置为“最新”(如果没有以前提交的偏移量,则该策略将生效),但是您没有以前提交的偏移量,那么您不应该担心“丢失”消息,因为您告诉kafka不要关心“以前”发送给您的消费者的消息是否已准备就绪。如果您关心“以前”的消息,则应将策略设置为“最早”。在任何情况下,无论政策如何,您看到的行为都是暂时的,即一旦承诺的补偿在Kafka中保存,每次重新启动时,消费者都会从之前离开的地方重新开始
kulphzqa3#
多亏了亚历克赛(我也投了赞成票),我似乎基本上按照同样的想法解决了我的问题。只是想分享我的经验。。。在我们的例子中,我们以请求&响应的方式使用kafka,有点像rpc。正在发送一个主题的请求,然后等待另一个主题的响应。遇到类似的问题,即错过第一个响应。我试过了 ... KafkaConsumer.assignment(); 反复地(用 Thread.sleep(100); )但似乎没有帮助。添加 KafkaConsumer.poll(50); 似乎已经启动了消费者(群体),也收到了第一React。测试了几次,现在一直有效。顺便说一句,测试需要停止应用程序&删除kafka主题,并重新启动kafka。附言:只是打电话 poll(50); 没有 assignment(); 像alexey提到的那样,获取逻辑可能不能保证消费者(组)已经准备好了。
... KafkaConsumer.assignment();
Thread.sleep(100);
KafkaConsumer.poll(50);
poll(50);
assignment();
xdnvmnnf4#
你可以用 consumer.assignment() ,它将返回一组分区,并验证是否为该主题分配了所有可用的分区。如果您使用的是springkafka项目,那么可以包含springkafka测试依赖性,并使用下面的方法等待主题分配,但是您需要有容器。 ContainerTestUtils.waitForAssignment(Object container, int partitions);
consumer.assignment()
ContainerTestUtils.waitForAssignment(Object container, int partitions);
4条答案
按热度按时间t40tm48m1#
您可以执行以下操作:
我有一个测试,读取Kafka主题的数据。
所以您不能在多线程环境中使用kafkaconsumer,但是您可以传递参数“atomicreference assignment”,在使用者线程中更新它,并在另一个线程中读取它。
例如,在项目中截取工作代码进行测试:
附笔
needstop-全局标志,在成功失败的情况下停止所有正在运行的线程(如果有的话)
事件-我要检查的对象的列表
readtimeout—在读取所有数据之前等待的时间,如果readtimeout==-1,则在读取任何数据时停止
6qqygrtg2#
如果您的策略设置为“最新”(如果没有以前提交的偏移量,则该策略将生效),但是您没有以前提交的偏移量,那么您不应该担心“丢失”消息,因为您告诉kafka不要关心“以前”发送给您的消费者的消息是否已准备就绪。
如果您关心“以前”的消息,则应将策略设置为“最早”。
在任何情况下,无论政策如何,您看到的行为都是暂时的,即一旦承诺的补偿在Kafka中保存,每次重新启动时,消费者都会从之前离开的地方重新开始
kulphzqa3#
多亏了亚历克赛(我也投了赞成票),我似乎基本上按照同样的想法解决了我的问题。
只是想分享我的经验。。。在我们的例子中,我们以请求&响应的方式使用kafka,有点像rpc。正在发送一个主题的请求,然后等待另一个主题的响应。遇到类似的问题,即错过第一个响应。
我试过了
... KafkaConsumer.assignment();
反复地(用Thread.sleep(100);
)但似乎没有帮助。添加KafkaConsumer.poll(50);
似乎已经启动了消费者(群体),也收到了第一React。测试了几次,现在一直有效。顺便说一句,测试需要停止应用程序&删除kafka主题,并重新启动kafka。
附言:只是打电话
poll(50);
没有assignment();
像alexey提到的那样,获取逻辑可能不能保证消费者(组)已经准备好了。xdnvmnnf4#
你可以用
consumer.assignment()
,它将返回一组分区,并验证是否为该主题分配了所有可用的分区。如果您使用的是springkafka项目,那么可以包含springkafka测试依赖性,并使用下面的方法等待主题分配,但是您需要有容器。
ContainerTestUtils.waitForAssignment(Object container, int partitions);