从数据库向Kafka发送消息,保证不丢失消息并保持顺序

yacmzcpb  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(160)

我需要将数据库中的数据发送到Kafka。任何数据都不能丢失,并且从数据库中提取消息时要严格保持消息的顺序。消息发送后,我需要将其从数据库中删除。一旦完成,此任务将反复进行(通过@Scheduler调度)。
我得出的结论是,保证不丢失任何消息并保持消息的顺序需要以下几点:在发送一条新消息之前,我需要确保前一条消息已经成功地发送到Kafka代理(Acks=allmin.insync.replicas=2)。如果一条消息没有发送到代理,就没有必要发送下一条消息。因此,解决方案是同步的。下面是我的代码示例:

public List<String> sendMessages(String topicName, List<Object> data) {
    List<String> successIds = new ArrayList<>();
    for (Object value : data) {
        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicName, value.getSiebelId(), value);
        try {
            listenableFuture.get(3, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.warn("todo");
            break;
        }
        successIds.add(value.getId());
    }
    return successIds;
}

successIds包含成功传递到代理的消息的id。接下来,我使用它们删除数据库中的相应数据。如果在从List<Object>数据发送消息的操作期间,由于某种原因,一些消息没有传递到代理,那么我们提前结束迭代,并删除设法进入successIds的消息。在下一次迭代中,我们将从那些没有包括在successIds中的消息开始,因为它们还没有从数据库中删除。
从这个解决方案需要拒绝异步,这肯定会导致性能下降。我已经测试过了,它运行得非常慢。我是Kafka的新手,所以想听听Maven的意见。这个解决方案是最优的吗?

fnatzsnv

fnatzsnv1#

与问题中的listenableFuture.get相比,下面的解决方案给予了我更快的工作效率(在我的测试数据上是listenableFuture.get的100倍)。这里我在onSuccess方法中把成功发送的id放入列表中的listenableFuture中添加了一个callback。在对列表进行迭代之后,我在kafkaTemplate上调用flush()

@Override
public List<String> sendMessages(String topicName, List<T> data) {
    List<String> successIds = new ArrayList<>();
    data.forEach(value ->
            kafkaTemplate.send(topicName, value.getSiebelId(), value)
                    .addCallback(new ListenableFutureCallback<>() {
                        @Override
                        public void onSuccess(SendResult<String, Object> result) {
                            successIds.add(value.getId());
                        }

                        @Override
                        public void onFailure(Throwable exception) {
                            log.warn("todo");
                        }
                    }));
    kafkaTemplate.flush();
    return successIds;
}

然而,这个解决方案的successIds输出可能与问题中的输出不同。(例如由于网络问题),第四,第5个仍将发送给经纪人,并可能交付(如果网络问题已解决)。因此successIds={1,2,4,5}。稍后,3th消息将使用新的列表迭代发送,因此可能在5th消息之后发送。因此,它的发送速度更快,给予保证没有消息丢失。但不会给予100%的保证保持订单。这并不理想,但也许我会尝试用它作为妥协。
在相同的情况下,listenableFuture.get的解决方案甚至不会发送第4条、第5条消息,而是得到successIds={1,2}。未传递的消息3th, 4th, 5th将在新的列表迭代中以正确的顺序发送。
我无法正确解释为什么我用所提供的解决方案获得了很大的生产力。我猜kafkaTemplate.flush()不知何故做了asych的事情,而listenableFuture.get把请求放在序列中等待相应的响应。
P.S.有趣的是,如果我使用相同的代码,但删除了kafkaTemplate.flush()行,而是用autoflash=true初始化bean kafkaTemplate,那么它再次运行缓慢。

相关问题