加入mongo时spark结构化流媒体中的巨大hadoop appdata

0aydgbwb  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(348)

我想加入一些来自基于json的kafka源代码的事件,其中包含mongodb集合中相关数据的url字段。然后聚合它们,包括额外的mongodb数据,并将数据输出到gcs接收器。
当我运行结构化流式spark应用程序时,spark集群开始无限地填充磁盘空间。我已经将水印配置为0秒(因为我只聚合当前处理批中的事件),所以最大状态应该是1或2个批。但我有一个可用磁盘空间图(在关闭应用程序时保持稳定):

几乎所有填充硬盘的数据都位于: /hadoop/yarn/nm-local-dir/usercache/myuser/appcache/myapplication-id 如果我禁用mongodb连接,可用磁盘将随着时间的推移保持稳定,但我需要连接的数据。
我想加入的mongodb集合大约有11gb大,我输入的kafka主题大约有3k条记录/秒。
我的代码如下所示:

import com.mongodb.spark.MongoSpark
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object Main {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("My awesome joined app")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
      .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
      .set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")

      // Mongo config
      .set("spark.mongodb.input.uri", "mongodb://mongo/urls.urls")

    val session = SparkSession
      .builder
      .config(conf)
      .getOrCreate()

    val topic = "events"
    val kafkaStream = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("subscribe", topic)
      .option("startingOffsets" , "latest")
      .option("maxOffsetsPerTrigger", 10000000)
      .option("failOnDataLoss", false)
      .option("kafka.max.partition.fetch.bytes", 10485760)
      .option("kafka.receive.buffer.bytes", 16000000)
      .load()

    val urlsDf = MongoSpark.load(session).toDF

    import session.implicits._
    stream
      .selectExpr("CAST (value AS STRING)", " CAST (timestamp AS STRING)").as[(String, String)]
      .withColumn("name", json_tuple('value, "name"))
      .withColumn("url", json_tuple('value, "url"))

      .withColumn("a_name", when(col("name") === "a", 1).otherwise(0))
      .withColumn("b_name", when(col("name") === "b", 1).otherwise(0))

      .withColumn("date", json_tuple('value, "date"))

      // We don't care about reinjecting old data, fake watermarking
      .withColumn("server_time", current_timestamp)
      .withWatermark("server_time", "0 seconds")

      .groupBy($"url", $"server_time")
      .agg(
        sum(s"a_name") as "a_name",
        sum(s"b_name") as "b_name"
      )

      // Join with entities
      .join(urlsDf, $"url" === $"_id", "left_outer")

      .select(
        "url",
        "some_value_from_mongo", // from the joined stream
        s"a_name",
        s"b_name"
      )

      .coalesce(24)
      .writeStream
      .format("parquet")
      .outputMode("append")
      .option("path", "gs://my-custom-data")
      .option("checkpointLocation", "/my-custom-data/checkpoints")
      .trigger(Trigger.ProcessingTime(10 * 60 * 1000)) // 10 minutes

      .start
      .awaitTermination
  }
}
fwzugrvs

fwzugrvs1#

将dynamicallocation设置为false解决了此问题。
为此,您可以在spark submit中设置以下conf:

--conf "spark.dynamicAllocation.enabled=false" \

似乎有一个问题,Yarn或Spark不删除不必要的状态之前,执行器存在。https://issues.apache.org/jira/browse/yarn-7070.
除此之外,加入11gb不断增长的mongo系列是一个糟糕的架构设计。随着集合的增加,需要增加spark执行器。在我的例子中,由于工人之间的大量洗牌,这也导致了巨大的gc时间。

相关问题