结构化流媒体

y4ekin9u  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(492)

我在k8s操作符上部署了一个结构化的流式处理作业,它简单地读取kafka,反序列化,添加2列,并将结果存储在datalake中(尝试delta和parquet),几天后执行器增加了内存,最终我得到了oom。输入记录的kbs非常低。p、 我使用完全相同的代码,但是cassandra作为一个Flume,运行了将近一个月,没有任何问题。有什么想法吗?
在此处输入图像描述
在此处输入图像描述
我的代码

spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", MetisStreamsConfig.bootstrapServers)
    .option("subscribe", MetisStreamsConfig.topics.head)
    .option("startingOffsets", startingOffsets)
    .option("maxOffsetsPerTrigger", MetisStreamsConfig.maxOffsetsPerTrigger)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]
    .withColumn("payload", from_json($"value", schema))

    // selection + filtering
    .select("payload.*")
    .select($"vesselQuantity.qid" as "qid", $"vesselQuantity.vesselId" as "vessel_id", explode($"measurements"))
    .select($"qid", $"vessel_id", $"col.*")
    .filter($"timestamp".isNotNull)
    .filter($"qid".isNotNull and !($"qid"===""))
    .withColumn("ingestion_time", current_timestamp())
    .withColumn("mapping", MappingUDF($"qid"))
  writeStream
    .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      log.info(s"Storing batch with id: `$batchId`")
      val calendarInstance = Calendar.getInstance()

      val year = calendarInstance.get(Calendar.YEAR)
      val month = calendarInstance.get(Calendar.MONTH) + 1
      val day = calendarInstance.get(Calendar.DAY_OF_MONTH)
      batchDF.write
        .mode("append")
        .parquet(streamOutputDir + s"/$year/$month/$day")
    }
    .option("checkpointLocation", checkpointDir)
    .start()

我改为foreachbatch,因为使用delta或parquet和partitionby cause问题更快

6tqwzwtp

6tqwzwtp1#

spark 3.1.0中解决了一个bug。
看到了吗https://github.com/apache/spark/pull/28904
克服问题的其他方法&调试的功劳:
https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read
即使您使用foreachbatch。。。

dfddblmv

dfddblmv2#

对于一些结构化的流式spark2.4.4应用程序,我也遇到了同样的问题,这些应用程序使用 partitionBy .
似乎与容器中的jvm内存分配有关,如这里详细解释的:https://merikan.com/2019/04/jvm-in-a-container/
我的解决方案(但取决于您的jvm版本)是在 yaml 我的spark应用程序的定义:

spec:
    javaOptions: >-
        -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap

这样,我的streamin应用程序运行正常,内存量正常(驱动程序1gb,执行程序2gb)
编辑:虽然第一个问题似乎已经解决了(控制器杀死pods以消耗内存),但非堆内存大小增长缓慢仍然是一个问题;几个小时后,司机/执行人被杀。。。

相关问题