Kafka多重分区排序

bjp0bcyl  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(378)

我知道在kafka中不可能对多个分区进行排序,并且分区排序只能保证一个组中的单个使用者(对于单个分区)。然而,Kafka流0.10现在有可能实现这一点吗?如果我们使用时间戳特性,使得每个分区中的每条消息都保持顺序,那么在使用者端,让我们假设kafka streams为0.10,这现在是可能的吗?假设我们收到了所有的消息,我们能不能根据消耗的时间戳对所有的分区进行排序,并将它们转发到一个单独的主题上进行消耗?
目前我需要保持顺序,但这意味着有一个分区和一个使用者线程。我想把它改成多个分区来增加并行性,但不知怎么的,我还是把它们整理好了。
有什么想法吗?谢谢您。

trnvg8h3

trnvg8h31#

在这种情况下,您面临两个问题:
具有多个分区的kafka主题,而事实上kafka不保证(主题的)全局排序用于此类多分区主题。
主题及其分区的消息延迟到达/无序的可能性,这与时间和时间戳有关。
我知道在kafka中不可能对多个分区进行排序,并且分区排序只能保证一个组中的单个使用者(对于单个分区)。然而,Kafka流0.10现在有可能实现这一点吗?
简单的回答是:不,当你阅读Kafka的多个分区的主题时,仍然不可能实现全局秩序。
另外,“分区排序”是指“基于分区中消息的偏移量的分区排序”。排序保证与消息的时间戳无关。
最后,只有在以下情况下才能保证订购 max.in.flight.requests.per.connection == 1 :
apache kafka文档中的生产者配置设置: max.in.flight.requests.per.connection (默认值: 5 ):在阻塞之前,客户端将在单个连接上发送的最大未确认请求数。请注意,如果将此设置设置为大于1并且存在失败的发送,则存在由于重试而导致消息重新排序的风险(即,如果启用了重试)。
请注意,在这一点上,我们讨论的是Kafka中消费者行为(这是您最初的问题的起点)和生产者行为的组合。
如果我们使用时间戳特性,使得每个分区中的每条消息都保持顺序,那么在使用者端,让我们假设kafka streams为0.10,这现在是可能的吗?
即使使用时间戳特性,我们仍然无法实现“每个分区中的每条消息都保持顺序”。为什么?因为可能会有迟到/不正常的信息。
分区按偏移量排序,但不能保证按时间戳排序。分区的以下内容在实践中是完全可能的(时间戳通常是从epoch开始的毫秒):

Partition offsets     0    1    2    3    4    5    6    7    8
Timestamps            15   16   16   17   15   18   18   19   17
                                          ^^
                                         oops, late-arriving data!

什么是迟到/坏消息?想象一下,你的传感器遍布世界各地,所有这些传感器都测量当地的温度,并将最新的测量结果发送给Kafka的一个主题。一些传感器可能具有不可靠的互联网连接,因此它们的测量可能会延迟几分钟、几小时甚至几天。最终,他们延迟的测量将到达Kafka,但他们将“迟到”。城市里的手机也是如此:有些手机可能电池/能量耗尽,需要充电才能发送数据,有些手机可能因为你在地下开车而失去互联网连接,等等。
假设我们收到了所有的消息,我们能不能根据消耗的时间戳对所有的分区进行排序,并将它们转发到一个单独的主题上进行消耗?
理论上是的,但实际上这很难。“我们接收所有消息”的假设对于流式处理系统(甚至对于批处理系统,尽管这里可能只是简单地忽略了延迟到达数据的问题)来说实际上是一个挑战。你永远不知道你是否真的收到了“所有的消息”--因为可能是迟到的数据。如果您收到迟到的信息,您希望发生什么?重新处理/重新排序“所有”消息(现在包括延迟到达的消息),或忽略延迟到达的消息(从而计算不正确的结果)?从某种意义上说,通过“让我们把它们全部排序”来实现任何这样的全球排序,要么代价高昂,要么是尽力而为。

zsohkypk

zsohkypk2#

我没有使用Kafka流-但这是有可能做到这一点与正常消费者。
首先对分区进行排序—这假设您已经在每个您想要的或使用的使用者组中查找了偏移量。

private List<List<ConsumerRecord<String, String>>> orderPartitions(ConsumerRecords<String, String> events) {

    Set<TopicPartition> pollPartitions = events.partitions();
    List<List<ConsumerRecord<String, String>>> orderEvents = new ArrayList<>();
    for (TopicPartition tp : pollPartitions) {
        orderEvents.add(events.records(tp));
    }
    // order the list by the first event, each list is ordered internally also
    orderEvents.sort(new PartitionEventListComparator());
    return orderEvents;
}

/**
 * Used to sort the topic partition event lists so we get them in order
 */
private class PartitionEventListComparator implements Comparator<List<ConsumerRecord<String, String>>> {

    @Override
    public int compare(List<ConsumerRecord<String, String>> list1, List<ConsumerRecord<String, String>> list2) {
        long c1 = list1.get(0).timestamp();
        long c2 = list2.get(0).timestamp();
        if (c1 < c2) {
            return -1;
        } else if (c1 > c2) {
            return 1;
        }

        return 0;
    }

}

然后循环分区以使事件有序-实际上我发现这是可行的。

ConsumerRecords<String, String> events = consumer.poll(500);
                int totalEvents = events.count();
                log.debug("Polling topic - recieved " + totalEvents + " events");
                if (totalEvents == 0) {
                    break;  // no more events
                }

                List<List<ConsumerRecord<String, String>>> orderEvents = orderPartitions(events);

                int cnt = 0;
                // Each list is removed when it is no longer needed
                while (!orderEvents.isEmpty() && sent < max) {
                    for (int j = 0; j < orderEvents.size(); j++) {
                        List<ConsumerRecord<String, String>> subList = orderEvents.get(j);
                        // The list contains no more events, or none in our time range, remove it
                        if (subList.size() < cnt + 1) {
                            orderEvents.remove(j);
                            log.debug("exhausted partition - removed");
                            j--;
                            continue;
                        }
                        ConsumerRecord<String, String> event = subList.get(cnt);
                        cnt++
}

相关问题