我使用的代码需要流Dataframe,但我的源df是非流的
流Dataframe可以通过这里描述的方法创建。主要的方法是使用sparksession.read…(path),但是我想从现有的json或非流df对象创建流df。
我有一个http响应,我将其转换为具有以下内容的df: val df = spark.read.json(Seq(response.body).toDS)
(spark.readstream不存在等效项)
黑客的解决方案是将json保存为一个文件,然后使用 SparkSession.readStream.json(path)
,但我想知道是否有更优雅的解决方案。理想的情况是 val = spark.readStream.df(df)
或者 df.convertToStreaming()
暂无答案!
目前还没有任何答案,快来回答吧!