我需要将数据库中的数据发送到Kafka
。任何数据都不能丢失,并且从数据库中提取消息时要严格保持消息的顺序。消息发送后,我需要将其从数据库中删除。一旦完成,此任务将反复进行(通过@Scheduler
调度)。
我得出的结论是,保证不丢失任何消息并保持消息的顺序需要以下几点:在发送一条新消息之前,我需要确保前一条消息已经成功地发送到Kafka代理(Acks=all
,min.insync.replicas=2
)。如果一条消息没有发送到代理,就没有必要发送下一条消息。因此,解决方案是同步的。下面是我的代码示例:
public List<String> sendMessages(String topicName, List<Object> data) {
List<String> successIds = new ArrayList<>();
for (Object value : data) {
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicName, value.getSiebelId(), value);
try {
listenableFuture.get(3, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("todo");
break;
}
successIds.add(value.getId());
}
return successIds;
}
successIds
包含成功传递到代理的消息的id。接下来,我使用它们删除数据库中的相应数据。如果在从List<Object>
数据发送消息的操作期间,由于某种原因,一些消息没有传递到代理,那么我们提前结束迭代,并删除设法进入successIds
的消息。在下一次迭代中,我们将从那些没有包括在successIds
中的消息开始,因为它们还没有从数据库中删除。
从这个解决方案需要拒绝异步,这肯定会导致性能下降。我已经测试过了,它运行得非常慢。我是Kafka
的新手,所以想听听Maven的意见。这个解决方案是最优的吗?
1条答案
按热度按时间fnatzsnv1#
与问题中的
listenableFuture.get
相比,下面的解决方案给予了我更快的工作效率(在我的测试数据上是listenableFuture.get
的100倍)。这里我在onSuccess
方法中把成功发送的id放入列表中的listenableFuture
中添加了一个callback
。在对列表进行迭代之后,我在kafkaTemplate
上调用flush()
。然而,这个解决方案的
successIds
输出可能与问题中的输出不同。(例如由于网络问题),第四,第5个仍将发送给经纪人,并可能交付(如果网络问题已解决)。因此successIds={1,2,4,5}
。稍后,3th
消息将使用新的列表迭代发送,因此可能在5th
消息之后发送。因此,它的发送速度更快,给予保证没有消息丢失。但不会给予100%的保证保持订单。这并不理想,但也许我会尝试用它作为妥协。在相同的情况下,
listenableFuture.get
的解决方案甚至不会发送第4条、第5条消息,而是得到successIds={1,2}
。未传递的消息3th, 4th, 5th
将在新的列表迭代中以正确的顺序发送。我无法正确解释为什么我用所提供的解决方案获得了很大的生产力。我猜
kafkaTemplate.flush()
不知何故做了asych的事情,而listenableFuture.get
把请求放在序列中等待相应的响应。P.S.有趣的是,如果我使用相同的代码,但删除了
kafkaTemplate.flush()
行,而是用autoflash=true
初始化beankafkaTemplate
,那么它再次运行缓慢。