使用spark 2.3 structed streaming和kafka作为输入流。我的集群是建立在主人和3工人(master在一台worker机器上运行)我的kafka主题有3个分区作为worker的数量。我使用默认触发器和foreach sink来处理数据。
当第一条消息到达驱动程序时,它立即开始处理其中一个可用工作节点上的数据,在处理时,第二条消息到达,而不是立即开始在可用工作节点上处理它,处理的“执行”被延迟,直到第一个工作节点结束处理,现在所有的“等待执行”开始在所有可用的worker上并行处理(假设我有3条等待消息)
我怎样才能强迫等待的工人立即开始执行?
我的代码片段:
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
override def open(partitionId: Long, version: Long) = true
override def process(filePath: String) = {
val filesSeq = fileHandler
.handleData(filePath) // long processing
}
override def close(errorOrNull: Throwable) = {}
}
val filesDf = kafkaStreamSubscriber
.buildtream(conf, kafkaInputTopic)
val ds = filesDf.map(x=>x.getAs("filePath").asInstanceOf[String])
val query =
ds.writeStream
.foreach(writer)
.start
ds.writeStream
.format("console")
.option("truncate", "false")
.start()
println("lets go....")
query.awaitTermination()
我做错什么了?当我有等待处理的数据时,我不希望有空闲的工人
桑克斯
1条答案
按热度按时间t30tvxxf1#
请参阅spark structured streaming triggers文档部分
据我所知,默认触发器一次处理一个微批。如果您需要在数据到达时立即处理数据,我建议您考虑实验连续模式。
我的理解是,如果您使用trigger,比如说5秒,微批处理将读取所有3个分区的消息,您将有3个任务同时运行。在它们全部完成之前,不会有微批开始。
希望有帮助!