我想加入一些来自基于json的kafka源代码的事件,其中包含mongodb集合中相关数据的url字段。然后聚合它们,包括额外的mongodb数据,并将数据输出到gcs接收器。
当我运行结构化流式spark应用程序时,spark集群开始无限地填充磁盘空间。我已经将水印配置为0秒(因为我只聚合当前处理批中的事件),所以最大状态应该是1或2个批。但我有一个可用磁盘空间图(在关闭应用程序时保持稳定):
几乎所有填充硬盘的数据都位于: /hadoop/yarn/nm-local-dir/usercache/myuser/appcache/myapplication-id
如果我禁用mongodb连接,可用磁盘将随着时间的推移保持稳定,但我需要连接的数据。
我想加入的mongodb集合大约有11gb大,我输入的kafka主题大约有3k条记录/秒。
我的代码如下所示:
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object Main {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("My awesome joined app")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
// Mongo config
.set("spark.mongodb.input.uri", "mongodb://mongo/urls.urls")
val session = SparkSession
.builder
.config(conf)
.getOrCreate()
val topic = "events"
val kafkaStream = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", topic)
.option("startingOffsets" , "latest")
.option("maxOffsetsPerTrigger", 10000000)
.option("failOnDataLoss", false)
.option("kafka.max.partition.fetch.bytes", 10485760)
.option("kafka.receive.buffer.bytes", 16000000)
.load()
val urlsDf = MongoSpark.load(session).toDF
import session.implicits._
stream
.selectExpr("CAST (value AS STRING)", " CAST (timestamp AS STRING)").as[(String, String)]
.withColumn("name", json_tuple('value, "name"))
.withColumn("url", json_tuple('value, "url"))
.withColumn("a_name", when(col("name") === "a", 1).otherwise(0))
.withColumn("b_name", when(col("name") === "b", 1).otherwise(0))
.withColumn("date", json_tuple('value, "date"))
// We don't care about reinjecting old data, fake watermarking
.withColumn("server_time", current_timestamp)
.withWatermark("server_time", "0 seconds")
.groupBy($"url", $"server_time")
.agg(
sum(s"a_name") as "a_name",
sum(s"b_name") as "b_name"
)
// Join with entities
.join(urlsDf, $"url" === $"_id", "left_outer")
.select(
"url",
"some_value_from_mongo", // from the joined stream
s"a_name",
s"b_name"
)
.coalesce(24)
.writeStream
.format("parquet")
.outputMode("append")
.option("path", "gs://my-custom-data")
.option("checkpointLocation", "/my-custom-data/checkpoints")
.trigger(Trigger.ProcessingTime(10 * 60 * 1000)) // 10 minutes
.start
.awaitTermination
}
}
1条答案
按热度按时间fwzugrvs1#
将dynamicallocation设置为false解决了此问题。
为此,您可以在spark submit中设置以下conf:
似乎有一个问题,Yarn或Spark不删除不必要的状态之前,执行器存在。https://issues.apache.org/jira/browse/yarn-7070.
除此之外,加入11gb不断增长的mongo系列是一个糟糕的架构设计。随着集合的增加,需要增加spark执行器。在我的例子中,由于工人之间的大量洗牌,这也导致了巨大的gc时间。