从spark中的结构化流Dataframe推断安全模式

csbfibhn  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(199)

我想从来自kafka的json数据推断出一个安全的模式。

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "input").option("auto.offset.reset", "latest").load()
jsonDF = df.selectExpr("CAST(value AS STRING) jsonData")

stackoverflow有几种解决方案:
link说,要将一个小批量保存到一个文件中,请推断模式,然后将该模式用于流式Dataframe。尽管这个问题已经有一年了。
链接使用 schema_of_json 以及 lit ,尽管我无法让它与流式dfs一起工作。
我知道推断模式可能很危险,但是我正在开发的spark应用程序有多个具有不同模式的源(多列)。有没有一种方法可以基于json数据中的列创建一个模式并强制转换它们 String 以防止数据丢失。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题