我有3个分区:0、1、2。所以消息可以分为0,1,2。
如:
分区0中有1条消息:0
分区1中的3条消息:111
分区2中的2条消息:22
如何使消费者以012x12x1x的顺序消费消息(x表示此时没有消息)。已消费消息的顺序如下:012121。我想在c和python中都这样做。纵观现有客户机,消息可以循环生成,但不能循环消费。
你知道吗?
kafka consumer配置中有partition.assignment.strategy(http://kafka.apache.org/documentation.html#consumerconfigs). 我正在寻找一些实现此配置的工具(如flume、spark、storm)来从kafka读取数据、重新排序并再次写入kafka。继续上面的例子。重新排序的消息看起来像:012121(012x12x1x)
更新
现在,我可以在ckafka客户机中实现这一点(https://github.com/edenhill/librdkafka).
for(int i = 0; i < 2; i++)
{
RdKafka::Message *msg = m_consumer->consume(m_topic, i, 1000);
// Do something about msg here...
}
输出:
Reading from 1=>4953---1---
Reading from 0=>46164---0---
Reading from 1=>4954---1---
Reading from 0=>46165---0---
Reading from 1=>4955---1---
Reading from 0=>46166---0---
Reading from 1=>4956---1---
Reading from 0=>46167---0---
Reading from 1=>4957---1---
Reading from 0=>46168---0---
1条答案
按热度按时间blmhpbnm1#
在java中,可以使用高级使用者。
如果在同一进程中使用所有3个分区中的
group.id
,您可以创建3个充当使用者的工作线程,并以循环方式在它们之间循环。我知道你明确提到python和c是你的目标语言。
根据我自己在python中的经验,没有什么能与java提供的高级使用者相媲美。
因此,您可以以某种方式 Package 它,比如创建一个充当服务器的新java进程,并通过这个进程使用消息,或者您可以尝试将高级使用者移植到python和/或c。
另一种方法是使用每个分区,将数据写入其他介质,如mysql(或任何其他rdbms),并使用sql执行循环,最后从相关表中删除消息。
无论如何,我建议你重新考虑Kafka作为你的传输层。您的需求(以循环方式消费)与kafka的核心设计/架构不一致。原因如下:
只要分区的数量小于消费机器中可用的核心,那么就可以使用单个(!)上的消息机器以循环方式运行,就像我上面描述的那样。
然而,kafka也是为分布式负载(消费者)而设计的。这就是为单个主题创建分区的动机。可伸缩性是Kafka的一个关键概念。
因此,如果用户计算机中的分区多于可用的核心,则可能无法正确地迭代分区并以循环方式使用消息。
示例:假设在一个特定主题中有20个分区。这个主题总是会产生很多信息。现在让我们假设您有一台具有4个cpu核心的机器来使用该主题。通过设计,一次最多可以使用4个分区。将此转换为循环消耗可以通过在内存中缓冲大量有意义的消息或其他机制(这可能导致延迟等其他问题)来实现。这时我们假设所有的分区都是随时可用的,没有影响kafka的网络问题或磁盘问题。这就是为什么我认为将分区“连接回”到其他一些消息流中并不是大规模的小事。