如何将json对象数组解析为sparkDataframe?

dm7nw8vv  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(554)

我有一个包含twitter数据的json文件集合,我想用它作为databricks/spark中结构化流的数据源。json文件具有以下结构:

[{...tweet data...},{...tweet data...},{...tweet data...},...]

我的Pypark代码:


# Stream from the /tmp/tweets folder

tweetstore = "/tmp/tweets/"

# Set up the folder as a streaming source

streamingInputDF = (
  spark \
    .readStream \
    .schema(json_schema) \
    .json(tweetstore)
)

# Check

streamingInputDF.isStreaming

# Access the DF using SQL

streamingQuery = streamingInputDF \
  .select("run_stamp", "user", "id", "source", "favorite_count", "retweet_count")\
  .writeStream \
  .format("memory") \
  .queryName("tweetstream") \
  .outputMode("append")\
  .start()

streamingDF = spark.sql("select * from tweetstream order by 1 desc")

我的输出如下所示:

Number of entries in dataframe: 3875046
+---------+----+----+------+--------------+-------------+
|run_stamp|user|id  |source|favorite_count|retweet_count|
+---------+----+----+------+--------------+-------------+
|null     |null|null|null  |null          |null         |
|null     |null|null|null  |null          |null         |
|null     |null|null|null  |null          |null         |

据我所知,我可能需要 UDF 或者 explode() 正确地解析json数组,但到目前为止还不太清楚是如何解析的。

zed5wv10

zed5wv101#

为其他可能偶然发现这个问题的人记录答案:我意识到json并不像spark期望的那样每行有一个对象。关键是 .option("multiline", True) ,即:

streamingInputDF = (
  spark \
    .readStream \
    .option("multiline", True) \
    .schema(json_schema) \
    .json(tweetstore)
)
alen0pnh

alen0pnh2#

它在样本数据上对我来说很有效-

val data = """[{"id":1,"name":"abc1"},{"id":2,"name":"abc2"},{"id":3,"name":"abc3"}]"""
    val df = spark.read.json(Seq(data).toDS())
    df.show(false)
    df.printSchema()

    /**
      * +---+----+
      * |id |name|
      * +---+----+
      * |1  |abc1|
      * |2  |abc2|
      * |3  |abc3|
      * +---+----+
      *
      * root
      * |-- id: long (nullable = true)
      * |-- name: string (nullable = true)
      */

相关问题