如何在spark streaming中解析动态json格式的Kafka消息

ecbunoof  于 2023-01-13  发布在  Apache
关注(0)|答案(1)|浏览(148)

我正在实现一个Spark Structured Streaming作业,在该作业中,我正在使用来自Kafka的JSON格式的消息。
由于json数据是动态的,所以我没有模式信息可以在from_json函数中用于将json数据加载到spark Dataframe
下面的代码是我用来阅读Kafka主题的数据

df = spark_session.readStream.format("kafka") \
      .option("kafka.bootstrap.servers", "...") \
      .option("subscribe", "..") \
      .option("startingTimestamp", "...").load()

如何使用下面的from_json函数,或者是否有任何其他函数可用于将json数据加载到没有预定义模式的 Dataframe 中

df = df.select(from_json(col("value").cast("string")).alias("parsed_value"))
2g32fytz

2g32fytz1#

可以使用get_json_object(column, '<json path>')解析字符串类型的json列

相关问题