pyspark Spark Structured Streaming在Kubernetes中因执行器内存不足而失败

ruarlubt  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(133)

我正在运行一个spark结构化流作业,从对象存储桶中阅读,进行一些转换和过滤,调用groupby和聚合。将写入内容以csv格式提交到对象存储中。

maxFilesPerTrigger = 200
lookback_window = "24 hours"
sliding_interval = "1 hour"
watermark = "1 hour"
output_mode = "update"

df = spark\
    .readStream\
    .format(file_type)\
    .schema(schema)\
    .option("latestFirst", latest_first)\
    .option("maxFileAge", maxFileAge)\
    .option("header", header)\
    .option("timestampFormat", timestampFormat)\
    .option("maxFilesPerTrigger", maxFilesPerTrigger)\
    .load(f"{path}/*.{file_type}")

df = df.filter()
df = df.withcolumn()  .. 20 odd features
df = df.withWatermark()
df= df.groupby(x).agg() 22 features in aggregation.

def foreach_function(agg_df, batch_no):
    global write_minio_path

    t0 = time.time()
    logger.info(f"Writing batch -{batch_no} to {write_minio_path}")
    agg_df.write.mode("append").parquet(write_minio_path)
    logger.info(f"Batch - {batch_no} Time taken  ::{time.time() - t0}")

logger.info("Calling Foreach Batch Function")
query = df.writeStream.foreachBatch(foreach_function).option("checkpointLocation", checkpoint_path).outputMode(output_mode).start().awaitTermination()

**上述代码内存不足,3个执行器的内存为30 GB,内存开销为5 GB。驱动程序具有10 GB内存和1 GB开销。

23/09/20 14:09:43 ERROR TaskSchedulerImpl: Lost executor 4 on IP: The executor with id 4 exited with exit code 52(JVM OOM).


The API gave the following container statuses:

         container name: spark-kubernetes-executor
         container image: spark:3.3.0
         container state: terminated
         container started at: 2023-09-20T14:01:46Z
         container finished at: 2023-09-20T14:09:41Z
         exit code: 52
         termination reason: Error

23/09/20 14:09:49 ERROR FileFormatWriter: Aborting job 757c5b44-f7c9-436a-874b-295370b1ab97. org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 87 (start at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException

我们使用aws-sdk-java-1.11.0和hadoop-aws-3.3.0。我们在这些jar里是否有任何会导致OOM的线程泄漏?
我们检查了executor的堆转储,并将state作为内存中最大的对象。如何计算需要存储在执行器内存中的状态大小?
请帮我调试一下**

taor4pac

taor4pac1#

根据我的经验

  1. maxFilesPerTrigger的可预测性低于maxBytesPerTrigger
    1.即使使用maxFilesPerTrigger,200个文件对于3个执行器来说也太多了。我的目标是大约微批处理大小(基于文件或字节)小于sum(executor_memory)/2的一半。在你的情况下,它应该<= 45 G。如果超过200个文件,则可能会出现问题。
    不相关的说明-为什么使用foreachBatch
    在您的情况下,它可以简化为smth,如
df.writeStream.start(
   path=write_minio_path, 
   outputMode="append", 
   format="parquet",
   checkpointLocation=checkpoint_path
 ).awaitTermination()

相关问题