迭代流Dataframe中的列值,并使用scala和spark将每个值分配给公共列表

x9ybnkn6  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(363)

我有以下流Dataframe

+------------------------------------+
|______sentence______________________|
| Representative is a scientist      |
| Norman did a good job in the exam  |
| you want to go on shopping?        |
--------------------------------------

我的清单如下

val myList

作为最终输出,我需要mylist在streamdataframe中包含以上三句话
输出

myList = [Representative is a scientist, Norman did a good job in the exam, you want to go on shopping? ]

我尝试了下面给出的流错误

val myList =   sentenceDataframe.select("sentence").rdd.map(r => r(0)).collect.toList

使用上述方法引发错误
org.apache.spark.sql.analysisexception:具有流源的查询必须使用writestream.start()执行
请注意,上述方法适用于正常的Dataframe,但不适用于流Dataframe。
有没有一种方法可以迭代流Dataframe的每一行,并使用scala和spark将行值赋给公共列表?

kgsdhlau

kgsdhlau1#

这听起来像是一个非常奇怪的用例,因为流理论上永远不会结束。你确定你不是在寻找普通的sparkDataframe吗?
如果不是这样,你可以做的是使用蓄能器和Spark流Flume。我使用了一个简单的套接字连接来演示这一点。你可以用nc-lp3030在ubuntu下启动一个简单的socket服务器,然后将消息传递到流中,结果Dataframe的模式是[value:string]

val acc = spark.sparkContext.collectionAccumulator[String]

val stream = spark.readStream.format("socket").option("host", "localhost").option("port", "3030").load()

val query = stream.writeStream.foreachBatch((df: DataFrame, l: Long) => {
     df.collect.foreach(v => acc.add(v(0).asInstanceOf[String]))
  }).start()

...

// For some reason you are stopping the stream here
query.stop()
val myList = acc.value

现在你可能有一个问题,为什么我们使用累加器,而不仅仅是数组缓冲区。arraybuffers可以在本地工作,但在集群上,foreachbatch中的代码可能在完全不同的节点上执行。这意味着它不会产生任何影响,这也是蓄能器首先存在的原因(参见https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators)

相关问题