在利伯Kafka,
数据已通过以下函数放入传输队列。
virtual ErrorCode produce (Topic *topic, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t key_len, void *msg_opaque)
传输的结果可以在通过轮询(0)注册为回调的drècb()中找到。
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
void dr_cb (RdKafka::Message &message) {
// I want to know the order of produce.
}
};
如果我生产10次,我怎么知道第三次生产是否成功?我可以看到传输队列中的实际消息或消息数,但第三次生成的结果未知。
我想同步生成的消息的序列号和通过drècb报告完成的消息的序列号。我该怎么办?
1条答案
按热度按时间wgx48brx1#
在kafka中,排序只是每个分区的一件事,如果您生成多个分区,那么这些分区之间就没有排序。
对于生成到同一分区的消息,它们将按其原始顺序生成,除非存在需要重试的错误,在这种情况下可以重新排序。你可以
max.in.flight=1
为了避免排序(不幸的是,排序也会降低吞吐量和增加延迟),或者使用即将推出的幂等生产者(仅在主分支上,直到下一个版本)enable.idempotence=true
以极低的吞吐量成本获得有保证的订单交货。您可以使用message opaque将指针附加到product()中的一条消息上,该消息将作为
msg_opaque
在传递报告中,允许您将传递报告Map到生成的原始对象。