在kafka中实现并发

wb1gzix0  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(581)

我们正在对kafka消费者进行并行处理,以处理更多的记录来处理峰值负载。我们已经在做的一种方法是,在同一个消费群体中,通过旋转尽可能多的消费者和尽可能多的分区。
我们的消费者现在正在处理一个同步的api调用。我们认为使这个api调用异步化将使我们的消费者能够处理更多的负载。因此,我们试图使api调用异步,并在其响应中增加偏移量。然而,我们发现了一个问题:
通过使api调用异步,我们可以首先得到最后一条记录的响应,而之前记录的api调用都没有启动或完成。如果我们在收到最后一条记录的响应后立即提交偏移量,那么偏移量将更改为最后一条记录。同时,如果使用者重新启动或分区重新平衡,我们将不会在提交偏移量为的最后一条记录之前收到任何记录。这样,我们将错过未处理的记录。
到目前为止,我们已经有25个分区。我们期待了解是否有人在不增加分区的情况下实现了并行性,或者增加分区是实现并行性(避免偏移问题)的唯一方法。

r8uurelv

r8uurelv1#

你得看看Kafka batch 处理。简而言之:你可以设置巨大的 batch.size 有一点点(甚至是一个)的 partitions . 就整体而言 batchmessages 消耗时间 consumer 侧(即在ram内存中)-您可以按任何方式并行化这些消息。
我真的很想分享链接,但他们的数字滚过网页洞。
更新
就提交偏移量而言,您可以为整个 batch . 一般来说,kafka不会通过滥用分区数来达到目标性能要求,而是依赖于 batch 处理。
我已经看到了很多项目,遭受分区扩展的痛苦(稍后您可能会看到问题,例如在重新平衡期间)。经验法则-查看所有可用的 batch 先设置。

l5tcr1uw

l5tcr1uw2#

首先,您需要将消息的读取与这些消息的处理分离开来(如果只是一开始的话)。接下来看看您可以对api进行多少并发调用,因为无论异步与否,调用它的频率都比服务器能够处理的频率高是没有任何意义的。如果并发api调用的数量大致等于主题中的分区数量,那么异步调用api是没有意义的。
如果分区的数量远远小于可能并发api调用的最大数量,那么您有几个选择。您可以按照您的建议,通过异步调用api的线程来尝试使用较少的线程(每个使用者一个线程)进行最多并发api调用,也可以创建更多线程并同步进行调用。当然,接下来你会遇到这样的问题:你的用户如何将他们的工作交给更多的共享线程,但这正是flink或storm等流执行平台为你做的。提供检查点处理的流平台(比如flink)也可以处理在消息处理无序时如何处理偏移提交的问题。您可以运行自己的检查点处理和共享线程管理,但必须真正避免使用流执行平台。
最后,您可能有比最大可能并发api调用更多的使用者,但是我建议您只有更少的使用者和共享分区,而不是api调用线程。
当然,您也可以随时更改主题分区的数量,以使上面的首选选项更加可行。
不管是哪种方式,要回答您的特定问题,您需要了解flink如何使用kafka offset提交检查点处理。为了过于简单化(因为我认为您不想自己滚动),kafka消费者不仅要记住他们刚刚提交的偏移量,而且还必须保留以前提交的偏移量,这定义了一个通过应用程序的消息块。要么整个消息块都被处理,要么需要将每个线程的处理状态回滚到上一个块中最后一条消息被处理的位置。再说一次,这是一个重大的过于简单化,但这是怎么做的。

相关问题