将JSON保存到HDFS的结构化流

jjjwad0x  于 2022-12-09  发布在  HDFS
关注(0)|答案(1)|浏览(273)

我的Structured Spark Streaming程序是从Kafka读取JSON数据并以JSON格式写入HDFS。我可以将JSON保存到HDFS,但它将JSON字符串保存为:

"jsontostructs(CAST(value AS STRING))"
key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.

如何仅保存

{"age":42,"name":"John"}?



StructType schema = kafkaPrimerRow.schema();

//Read json from kafka. JSON is: {"age":42,"name":"John"}
Dataset<Row> df = spark
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", input_bootstrap_server)
                    .option("subscribe", topics[0])
                    .load();



    //Save Stream to HDFS
    StreamingQuery ds = df             
.select(functions.from_json(col("value").cast(DataTypes.StringType),schema)) 
.writeStream()

.format("json")
.outputMode(OutputMode.Append())
.option("path", destPath)
.option("checkpointLocation", checkpoint)
.start();
9o685dep

9o685dep1#

下面的.select(“data.*”)实现了这个目的。

StreamingQuery ds = df
                        .select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data"))
                        .select("data.*")
                        .writeStream()
                        .format("json")
                        .outputMode(OutputMode.Append())
                        .option("path", destPath)
                        .option("checkpointLocation", checkpoint)
                        .start();

相关问题