动态更新spark kafka消费者的主题列表

oknrviil  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(577)

是否可以动态更新spark kafka consumer中的主题列表?
我有一个spark流应用程序,它使用spark kafka consumer。假设一开始我有spark kakfa消费者在听主题:[“test”],过了一会儿,我的主题列表更新为[“test”,“testnew”]。现在有没有一种方法可以更新spark kafka消费者主题列表,并要求spark kafka消费者在不停止sparkstreaming应用程序或sparkstreaming上下文的情况下使用更新后的主题列表的数据

tcomlyy6

tcomlyy61#

我建议尝试最新spark kafka integration(0.10)api版本的consumerstrategies.subscribepattern。
这看起来像:

KafkaUtils.createDirectStream(
mySparkStreamingContext,
PreferConsistent,
SubscribePattern("test.*".r.pattern, myKafkaParamsMap))
imzjd6km

imzjd6km2#

您可以使用基于线程的方法
1使用包含主题列表的任何数据结构定义缓存
2在缓存中添加元素的方法
三。你必须在a类和b类中,b有所有与Spark相关的逻辑
4类a是长时间运行的作业,从a调用b,只要有新的主题,就用b生成新的线程

6vl6ewon

6vl6ewon3#

是否可以动态更新spark kafka consumer中的主题列表
不可以。一旦使用初始化kafka流,接收器和无接收器方法都是固定的 KafkaUtils . 因为dag是固定的,所以你没有办法边走边传递新的主题。
如果您想动态地阅读,可以考虑一个batch k作业,它是迭代调度的,可以动态地阅读主题并创建一个 RDD 从中解脱出来。
另一个解决方案是使用一种技术,使您能够灵活地控制消费,例如akka streams。

q7solyqu

q7solyqu4#

正如尤瓦尔所说,这是不可能的,但是如果你知道你从Kafka那里处理的数据的结构/格式是什么,可能会有一个解决方法。
例如,
如果流应用程序正在侦听主题[“test”,“testnew”]
在您想要添加一个名为[test4]的新主题的那一行,作为一种解决方法,您只需向包含在其中的添加一个唯一键,并将其传递给现有主题。
设计流式应用程序时,要根据添加到test2数据中的键来识别/过滤数据

相关问题