我正在spark structured streaming中处理kafka json流。作为微批处理,我可以使用流式Dataframe的累加器吗?
LongAccumulator longAccum = new LongAccumulator("my accum");
Dataset<Row> df2 = df.filter(output.col("Called number").equalTo("0860"))
.groupBy("Calling number").count();
// put row counter to accumulator for example
df2.javaRDD().foreach(row -> {longAccumulator.add(1);})
投掷
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
. 我也很困惑这样使用蓄能器。将Dataframe向下转换为rdd看起来很奇怪,而且不必要。我可以不用c rdd和foreach()吗?
根据exeption,我从源dataframe中删除了foreach,并在writestream()中完成了它
StreamingQuery ds = df2
.writeStream().foreachBatch( (rowDataset, aLong) -> {
longAccum.add(1);
log.info("accum : " + longAccum.value());
})
.outputMode("complete")
.format("console").start();
它正在工作,但我在日志中没有值,在gui中也看不到累加器。
1条答案
按热度按时间mm5n2pyu1#
不,您可以使用下面的数据集直接访问-
请注意,累加器值仅在
action
执行。使用流式Dataframe