我们有一个用例,我们尝试使用与Schema注册表集成的多个Kafka主题(AVRO消息)。我们使用Spark结构化流(Spark版本:2.4.4)、《汇合的Kafka》(文库版:5.4.1)备变:
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap.server>:9092")
.option("subscribe", [List of Topics]) // multi topic subscribe
.load()
我们从上述DF中选择值,并使用Schema对AVRO消息进行反序列化
val tableDF = kafkaDF.select(from_avro(col("value"), jsonSchema).as("table")).select("*")
这里的障碍是,因为我们使用了多个Kafka主题,我们已经将所有JSON模式集成到MAP中,key是主题名称,values是各自的模式。我们如何在上面的select中使用相同的Map进行查找?我们尝试了UDF,但它返回了“col”类型,但jsonSchema必须是String类型。此外,不同主题的架构也不同
几个附加问题:
1.上述方法对于同时使用多个主题是否正确?
1.我们应该为每个主题使用一个消费者吗?
1.如果我们有更多的主题,我们如何才能实现并行主题处理,因为顺序可能需要大量的时间。
1条答案
按热度按时间xwbd5t1u1#
如果不检查所有内容,您看起来对from_avro等from_json等的基础知识还可以。
1.我会做table。
1.多个,多个模式--〉从.avsc* 读取多个这样的版本 * 或自己编码。
1.在Spark Streaming应用程序中使用多个主题的问题是每个应用程序使用多少个主题?没有硬性规则,除了一些明显的规则,如大消费与小消费,以及顺序是否重要。执行器资源可以放弃。
1.然后,您需要分别处理所有的主题,就像下面这样imho -
Filter on Topic
-您可以填写详细信息,因为我有点赶时间-使用foreachBatch
范例。1.不确定在静止状态下如何写出数据,但问题不在于此。
与此类似,再处理多个Topics:
但与此完美结合:Integrating Spark Structured Streaming with the Confluent Schema Registry