我有一个包含twitter数据的json文件集合,我想用它作为databricks/spark中结构化流的数据源。json文件具有以下结构:
[{...tweet data...},{...tweet data...},{...tweet data...},...]
我的Pypark代码:
# Stream from the /tmp/tweets folder
tweetstore = "/tmp/tweets/"
# Set up the folder as a streaming source
streamingInputDF = (
spark \
.readStream \
.schema(json_schema) \
.json(tweetstore)
)
# Check
streamingInputDF.isStreaming
# Access the DF using SQL
streamingQuery = streamingInputDF \
.select("run_stamp", "user", "id", "source", "favorite_count", "retweet_count")\
.writeStream \
.format("memory") \
.queryName("tweetstream") \
.outputMode("append")\
.start()
streamingDF = spark.sql("select * from tweetstream order by 1 desc")
我的输出如下所示:
Number of entries in dataframe: 3875046
+---------+----+----+------+--------------+-------------+
|run_stamp|user|id |source|favorite_count|retweet_count|
+---------+----+----+------+--------------+-------------+
|null |null|null|null |null |null |
|null |null|null|null |null |null |
|null |null|null|null |null |null |
据我所知,我可能需要 UDF
或者 explode()
正确地解析json数组,但到目前为止还不太清楚是如何解析的。
2条答案
按热度按时间zed5wv101#
为其他可能偶然发现这个问题的人记录答案:我意识到json并不像spark期望的那样每行有一个对象。关键是
.option("multiline", True)
,即:alen0pnh2#
它在样本数据上对我来说很有效-