在kafkautils.createstream()中使用“topics”参数的正确方法是什么?

vdzxcuhz  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(691)

我将在python中一起使用spark streaming和kafka,并大致跟随这篇文章,但我对前面提到的kafkautils.createstream()函数有点困惑。
文档并没有通过显式解释主题词典的影响来做很多工作。但我有一种怀疑,我之所以这么认为,是因为我对Kafka的工作原理的了解还很不可靠,答案很明显。
我知道应该是这样一本字典: {"topic.name": 1} 我可以鹦鹉学舌的文档,并说这意味着创建的流将消费从一个单一的分区。
所以我想我只是想澄清一下这个特殊函数的用法,以及我对Kafka概念的理解。我们将使用以下示例:
假设我已经定义了一个主题 my.topic 它有3个分区,传入的消息在一个键上被拆分,就说是一个userid。
如果我像这样初始化流:

from pyspark.streaming.kafka import KafkaUtils

kafkaStream = KafkaUtils.createStream(
    ssc, 
    'kafka:2181', 
    'consumer-group-name', 
    {'my.topic':1}
)

我是否正确地认为这个流将只从单个分区消耗,因此不会看到每个消息进入 my.topic ? 换句话说,它将只看到从userid发送到3个分区之一的消息?
我的问题是:
如何正确设置此参数以使用发送到的所有消息 my.topic ?
我的直觉是我将topics参数设置为 {'my.topic': 3} ,那么我的问题就变成了:
为什么我要使用一个小于分区总数的数字?
我的直觉告诉我,这是一个如何“原子”的问题,你正在做的工作是。例如,如果我只是简单地转换数据(例如,从一个csv转换成一个json文档列表之类的),那么上面的3个流中的每一个都有 {'my.topic': 1} 设置为their topics参数,并且同一使用者组的所有部分都将通过启用每个分区的并行消费而受益,因为不需要共享关于所消费的每个消息的信息。
同时,如果我在计算整个主题的实时指标,即带过滤器的时间窗平均值等,我很难找到一种方法来实现类似的东西,而不设置 {'my.topic': 3} ,或者如果它类似于一个和,那么对消费者组中的每个分量信号进行稍微复杂的下游处理,即sum1+sum2+sum3=totalsum
不过,我的知识也正处于与Kafka和斯帕克嬉戏的“羽翼未丰”阶段。
有没有一种方法可以告诉createstream()使用所有分区,而不提前知道有多少分区?像这样的 {'my.topic': -1} ?
一个流中可以指定多个主题吗?即。 {'my.topic': 1, 'my.other.topic': 1} 我真的不希望这个问题的答案仅仅是“是的,你的直觉是正确的”。最好的情况是有人告诉我我对每件事都有误解,并让我直截了当。所以请…做那个!

ut6juiuv

ut6juiuv1#

这就是KafkaSpark集成页面中提到的。
val kafkastream=kafkautils.createstream(streamingcontext,[zk quorum],[consumer group id],[per topic要使用的kafka分区数])
kafkautils.createstream将创建一个接收器并使用kafka主题。
选项“每个主题要使用的kafka分区数”意味着这个接收器将并行读取多少个分区。
例如,假设您有一个名为“topic1”的主题,有两个分区,并且您提供了选项“topic1”:1,那么kafka接收器将一次读取一个分区[它最终将读取所有分区,但一次将读取一个分区]。这样做的原因是读取分区中的消息,并保持数据写入主题的顺序。
例如,假设topic1有带有消息{1,11,21,31,41}的partition1和带有消息{2,12,22,32,42}的partition2,那么使用上述设置进行读取将产生类似于{1,11,21,31,41,2,12,22,32,42}的流。每个分区中的消息是分开读取的,因此它没有混合在一起。
如果您提供选项“topic1”:2,那么接收器将一次读取2个分区,并且这些分区中的消息将混合在一起。对于上面相同的启动示例,带有“topic1”:2的接收器将产生类似于{1,2,11,12,21,22….}
可以将此视为接收器可以对给定主题分区执行的并行读取数。
5一个流中可以指定多个主题吗?是的,你可以。

1l5u6lss

1l5u6lss2#

只需指定不带分区的主题,就可以获得该主题中的所有消息,无论该主题中有多少分区。
您只需看一下示例代码:https://github.com/apache/spark/blob/v2.2.1/examples/src/main/python/streaming/direct_kafka_wordcount.py#l48

相关问题