Spark Structured Streaming与架构注册表集成,用于基于Avro的消息

tnkciper  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(99)

我们有一个用例,我们尝试使用与Schema注册表集成的多个Kafka主题(AVRO消息)。我们使用Spark结构化流(Spark版本:2.4.4)、《汇合的Kafka》(文库版:5.4.1)备变:

val kafkaDF: DataFrame = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<bootstrap.server>:9092")
  .option("subscribe", [List of Topics]) // multi topic subscribe
  .load()

我们从上述DF中选择值,并使用Schema对AVRO消息进行反序列化

val tableDF = kafkaDF.select(from_avro(col("value"), jsonSchema).as("table")).select("*")

这里的障碍是,因为我们使用了多个Kafka主题,我们已经将所有JSON模式集成到MAP中,key主题名称values各自的模式我们如何在上面的select中使用相同的Map进行查找?我们尝试了UDF,但它返回了“col”类型,但jsonSchema必须是String类型。此外,不同主题的架构也不同
几个附加问题:
1.上述方法对于同时使用多个主题是否正确?
1.我们应该为每个主题使用一个消费者吗?
1.如果我们有更多的主题,我们如何才能实现并行主题处理,因为顺序可能需要大量的时间。

xwbd5t1u

xwbd5t1u1#

如果不检查所有内容,您看起来对from_avro等from_json等的基础知识还可以。

  1. https://sparkbyexamples.com/spark/spark-streaming-consume-and-produce-kafka-messages-in-avro-format/可以指导你的第一部分。这也很好https://www.waitingforcode.com/apache-spark-structured-streaming/two-topics-two-schemas-one-subscription-apache-spark-structured-streaming/read#filter_topic。
    1.我会做table。
    1.多个,多个模式--〉从.avsc* 读取多个这样的版本 * 或自己编码。
    1.在Spark Streaming应用程序中使用多个主题的问题是每个应用程序使用多少个主题?没有硬性规则,除了一些明显的规则,如大消费与小消费,以及顺序是否重要。执行器资源可以放弃。
    1.然后,您需要分别处理所有的主题,就像下面这样imho -Filter on Topic-您可以填写详细信息,因为我有点赶时间-使用foreachBatch范例。
    1.不确定在静止状态下如何写出数据,但问题不在于此。
    与此类似,再处理多个Topics:
...
... // Need to get Topic first
stream.toDS()
      .select($"topic", $"value")
      .writeStream
      .foreachBatch((dataset, _) => {
         dataset.persist() // need this

         dataset.filter($"topic" === topicA)
                .select(from_avro(col("value"), jsonSchemaTA)
                .as("tA"))
                .select("tA.*")
                .write.format(...).save(...)

         dataset.filter($"topic" === topicB)
                .select(from_avro(col("value"), jsonSchemaTB)
                .as("tB"))
                .select("tB.*")
                .write.format(...).save(...)
          ...
          ...
         dataset.unpersist()
         ...
         })

         .start().awaitTermination()

但与此完美结合:Integrating Spark Structured Streaming with the Confluent Schema Registry

相关问题