如何从kafka中减少sparkDataframe并收集结果?

kyxcudwk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(365)

我有一个从Kafka流创建的Dataframe。我想把它减少到一个值,然后在我的程序中使用这个值。

```scala
import sparkSession.implicits._
val df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("subscribe", "theTopic")
  .load()

val result = df
  .selectExpr("CAST(value AS STRING) as json")
  .map(json => getAnInt(json))
  .reduce { (x, y) =>
    if (x > y) x else y
  }

 someOtherFunction(result)
我希望将流简化为一个值,然后在我的程序的其余部分中使用。相反,它失败了:
org.apache.spark.sql.analysisexception:具有流源的查询必须使用writestream.start()执行;;kafka位于org.apache.spark.sql.catalyst.analysis.unsupportedoperationchecker$.throwerror(unsupportedoperationchecker)。scala:389)在org.apache.spark.sql.catalyst.analysis.u。。。
qgelzfjb

qgelzfjb1#

你只能用 writeStream 在流Dataframe上。我不确定你是否想要这个流Dataframe。如果你移除 readStream 使用 read 相反,你可以解决这个问题!

相关问题