大家好。我有一个Kafka项目使用SpringKafka什么听一个明确的主题。我需要一天听一次所有的信息,把它们收集起来,然后在那里找到特定的信息。我不明白如何用一个@kafkalistener方法来阅读所有的信息。我的班级是:
@Component
public class KafkaIntervalListener {
public CountDownLatch intervalLatch = new CountDownLatch(1);
private final SCDFRunnerService scdfRunnerService;
public KafkaIntervalListener(SCDFRunnerService scdfRunnerService) {
this.scdfRunnerService = scdfRunnerService;
}
@KafkaListener(topics = "${kafka.interval-topic}", containerFactory = "intervalEventKafkaListenerContainerFactory")
public void intervalListener(IntervalEvent event) throws UnsupportedEncodingException, JSONException {
System.out.println("Recieved interval message: " + event);
IntervalType type = event.getType();
Instant instant = event.getInterval();
List<IntervalEvent> events = new ArrayList<>();
events.add(event);
events.size();
this.intervalLatch.countDown();
}
}
我的事件集合的大小始终为1;我试着使用不同的循环,但后来,我的收藏变成了同一条消息的530000次归档。
更新:我已经找到了一种方法来使用factory.setbatchlistener(true);但我需要找到launch it with@scheduled(cron=“${kafka.cron}”,zone=“europe/moscow”)。现在这种方法总是在听。现在我试着这样做:
@Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow")
public void run() throws Exception {
kafkaIntervalListener.intervalLatch.await();
}
它不工作,在调试模式下我的断点从来没有工作在这个网站上。
1条答案
按热度按时间fivyi3re1#
侦听器容器在设计上是消息驱动的。
对于按需获取消息,最好使用kafka
Consumer
直接访问api并使用poll()
方法。