在kafka文档中,提到消费者不是线程安全的。为了避免这个问题,我读到为每个java进程运行一个consumer是个好主意。如何做到这一点?消费者的数量没有定义,但可以根据需要改变。谢谢,阿莱西奥
qni6mghb1#
您是对的,文档指定kafka消费者不是线程安全的。不过,它还指出,应该在单独的线程上运行使用者,而不是在进程上运行。那完全不同。请参阅此处,以获得针对java/jvm的更详细的答案:https://stackoverflow.com/a/15795159/236528一般来说,在一个Kafka主题上,您可以有任意多的消费者。其中一些可能共享一个组id,在这种情况下,该主题的所有分区将在任何时间点分布在所有活动的使用者中。关于kafka消费者的javadoc有更多的细节,链接在这个答案的底部,但是我复制了下面文档建议的两个线程/消费者模型。1每个线程一个使用者一个简单的选择是为每个线程提供自己的使用者示例。以下是这种方法的优缺点:赞成:这是最容易实施的赞成:它通常是最快的,因为不需要线程间的协调优点:它使得按分区的顺序处理非常容易实现(每个线程只是按照接收消息的顺序处理消息)。缺点:更多的使用者意味着更多到集群的tcp连接(每个线程一个)。一般来说,Kafka处理连接非常有效,所以这通常是一个小成本。缺点:多个使用者意味着向服务器发送更多的请求,而对数据的批处理稍微少一些,这可能会导致i/o吞吐量下降。con:所有进程的线程总数将受到分区总数的限制。2消费与加工脱钩另一种选择是让一个或多个使用者线程执行所有数据消耗,并将使用者记录示例交给一个阻塞队列,阻塞队列由实际处理记录处理的处理器线程池使用。这种选择也有利弊:优点:此选项允许独立地扩展使用者和处理器的数量。这使得有一个单一的使用者为多个处理器线程提供信息成为可能,从而避免了对分区的任何限制。缺点:保证处理器之间的顺序需要特别小心,因为线程将独立执行一个较早的数据块实际上可能会在稍后的数据块之后处理,这只是由于线程执行时间的运气。对于没有订购要求的处理,这不是问题。缺点:手动提交位置变得更加困难,因为它需要所有线程进行协调,以确保该分区的处理完成。这种方法有许多可能的变化。例如,每个处理器线程可以有自己的队列,使用者线程可以使用topicpartition散列到这些队列中,以确保按顺序使用并简化提交。根据我的经验,选项1是最适合起步的,只有当你真的需要的时候,你才能升级到选项2。选项#2是从kafka使用者获取最大性能的唯一方法,但其实现更为复杂。所以,先尝试一下选项1,看看它是否适合您的特定用例。完整的javadoc可通过以下链接获得:https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
1条答案
按热度按时间qni6mghb1#
您是对的,文档指定kafka消费者不是线程安全的。不过,它还指出,应该在单独的线程上运行使用者,而不是在进程上运行。那完全不同。请参阅此处,以获得针对java/jvm的更详细的答案:https://stackoverflow.com/a/15795159/236528
一般来说,在一个Kafka主题上,您可以有任意多的消费者。其中一些可能共享一个组id,在这种情况下,该主题的所有分区将在任何时间点分布在所有活动的使用者中。
关于kafka消费者的javadoc有更多的细节,链接在这个答案的底部,但是我复制了下面文档建议的两个线程/消费者模型。
1每个线程一个使用者
一个简单的选择是为每个线程提供自己的使用者示例。以下是这种方法的优缺点:
赞成:这是最容易实施的
赞成:它通常是最快的,因为不需要线程间的协调
优点:它使得按分区的顺序处理非常容易实现(每个线程只是按照接收消息的顺序处理消息)。
缺点:更多的使用者意味着更多到集群的tcp连接(每个线程一个)。一般来说,Kafka处理连接非常有效,所以这通常是一个小成本。
缺点:多个使用者意味着向服务器发送更多的请求,而对数据的批处理稍微少一些,这可能会导致i/o吞吐量下降。
con:所有进程的线程总数将受到分区总数的限制。
2消费与加工脱钩
另一种选择是让一个或多个使用者线程执行所有数据消耗,并将使用者记录示例交给一个阻塞队列,阻塞队列由实际处理记录处理的处理器线程池使用。这种选择也有利弊:
优点:此选项允许独立地扩展使用者和处理器的数量。这使得有一个单一的使用者为多个处理器线程提供信息成为可能,从而避免了对分区的任何限制。
缺点:保证处理器之间的顺序需要特别小心,因为线程将独立执行一个较早的数据块实际上可能会在稍后的数据块之后处理,这只是由于线程执行时间的运气。对于没有订购要求的处理,这不是问题。
缺点:手动提交位置变得更加困难,因为它需要所有线程进行协调,以确保该分区的处理完成。这种方法有许多可能的变化。例如,每个处理器线程可以有自己的队列,使用者线程可以使用topicpartition散列到这些队列中,以确保按顺序使用并简化提交。
根据我的经验,选项1是最适合起步的,只有当你真的需要的时候,你才能升级到选项2。选项#2是从kafka使用者获取最大性能的唯一方法,但其实现更为复杂。所以,先尝试一下选项1,看看它是否适合您的特定用例。
完整的javadoc可通过以下链接获得:https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html