在结构化流中,是否可以对多个查询使用一个Kafka流?

bnl4lu3b  于 2022-10-07  发布在  Kafka
关注(0)|答案(2)|浏览(134)

我有一个Spark应用程序,它必须使用单个Kafka主题作为源并行处理多个查询。

我注意到的行为是,每个查询都有自己的消费者(在它自己的消费者组中),导致相同的数据被多次流到应用程序(如果我错了,请纠正我),这似乎效率很低,相反,我希望有一个数据流,然后由Spark并行处理。

在上面的场景中,改进性能的推荐方法是什么?我是否应该专注于优化Kafka分区,而不是Spark与Kafka的交互方式?

欢迎有任何想法,谢谢。

xwbd5t1u

xwbd5t1u1#

我注意到的行为是,每个查询都有自己的消费者(在它自己的消费者组中),导致相同的数据被多次流到应用程序(如果我错了,请纠正我),这似乎效率很低,相反,我希望有一个数据流,然后由Spark并行处理。

tl;DR在当前设计中不可用。

单个流查询从接收器“开始”。一个流查询中只能有一个(为了更好地记住,我自己重复了一次,因为我似乎在使用Spark Structure Streaming、Kafka Streams和最近使用ksqlDB时被多次捕获)。

一旦有了接收器(输出),流查询就可以是started(在它自己的守护进程线程上)。

正是出于您提到的原因(不共享Kafka Consumer API要求group.id不同的数据),每个流查询都创建唯一的组ID(参见此代码和3.3.0中的注解),以便相同的记录可以通过不同的流查询进行转换:

// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = KafkaSourceProvider.batchUniqueGroupId(sourceOptions)

这是有道理的,IMHO。

我是否应该专注于优化Kafka分区,而不是Spark与Kafka的交互方式?

我想是的。

ccgok5k5

ccgok5k52#

您可以将源数据框分成不同的阶段,是的。

val df = spark.readStream.format("kafka") ... 
val strDf = df.select(cast('value).as("string")) ...
val df1 = strDf.filter(...)  # in "parallel"
val df2 = strDf.filter(...)  # in "parallel"

只有第一行应该创建Kafka消费者示例,而不是其他阶段,因为它们依赖于第一阶段的消费者记录。

相关问题