使用foreachwriter打印行

a0x5cqrl  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(395)

我正在使用apachespark(scala)读取来自kafka主题的传入数据流。我想打印邮件中的每一行。我想用 ForeachWriter 我的代码看起来像:

DF.writeStream.foreach(new ForeachWriter[Row] {

override def process(value: Row): Unit = {
  println(s"Processing ${value}")
  println(value.toString())
}
override def open(partitionId: Long, epochId: Long): Boolean = {true}

override def close(errorOrNull: Throwable): Unit = {}
}
).start()

但是我在控制台上没有得到任何输出。请帮忙。

xhv8bpkk

xhv8bpkk1#

有两种方法可以达到你想要的结果。
使用foreachwriter,您所做的一切都是正确的,但最终没有调用awaittermination()方法。
使用foreachbatch
代码:

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val kafkaDF = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "mytopic")
        .option("startingOffsets", "latest") 
        .load().select('value.cast("string"))

 // Any one approach can be used at a time

 // 1. using ForeachWriter

   kafkaDF.writeStream.foreach(new ForeachWriter[Row] {
    override def process(value: Row): Unit = println(s"Processing ${value}")
    override def open(partitionId: Long, epochId: Long): Boolean = true
    override def close(errorOrNull: Throwable): Unit = {}
}
).start().awaitTermination()

// 2. using foreachBatch
kafkaDF.writeStream.foreachBatch((ds, l) => {
    ds.foreach(println(_))
}).start().awaitTermination()

相关问题