我正在实现一个Spark Structured Streaming作业,在该作业中,我正在使用来自Kafka的JSON格式的消息。
由于json数据是动态的,所以我没有模式信息可以在from_json函数中用于将json数据加载到spark Dataframe
下面的代码是我用来阅读Kafka主题的数据
df = spark_session.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "...") \
.option("subscribe", "..") \
.option("startingTimestamp", "...").load()
如何使用下面的from_json函数,或者是否有任何其他函数可用于将json数据加载到没有预定义模式的 Dataframe 中
df = df.select(from_json(col("value").cast("string")).alias("parsed_value"))
1条答案
按热度按时间2g32fytz1#
可以使用
get_json_object(column, '<json path>')
解析字符串类型的json列