我正在运行一个spark结构化流作业,从对象存储桶中阅读,进行一些转换和过滤,调用groupby和聚合。将写入内容以csv格式提交到对象存储中。
maxFilesPerTrigger = 200
lookback_window = "24 hours"
sliding_interval = "1 hour"
watermark = "1 hour"
output_mode = "update"
df = spark\
.readStream\
.format(file_type)\
.schema(schema)\
.option("latestFirst", latest_first)\
.option("maxFileAge", maxFileAge)\
.option("header", header)\
.option("timestampFormat", timestampFormat)\
.option("maxFilesPerTrigger", maxFilesPerTrigger)\
.load(f"{path}/*.{file_type}")
df = df.filter()
df = df.withcolumn() .. 20 odd features
df = df.withWatermark()
df= df.groupby(x).agg() 22 features in aggregation.
def foreach_function(agg_df, batch_no):
global write_minio_path
t0 = time.time()
logger.info(f"Writing batch -{batch_no} to {write_minio_path}")
agg_df.write.mode("append").parquet(write_minio_path)
logger.info(f"Batch - {batch_no} Time taken ::{time.time() - t0}")
logger.info("Calling Foreach Batch Function")
query = df.writeStream.foreachBatch(foreach_function).option("checkpointLocation", checkpoint_path).outputMode(output_mode).start().awaitTermination()
**上述代码内存不足,3个执行器的内存为30 GB,内存开销为5 GB。驱动程序具有10 GB内存和1 GB开销。
23/09/20 14:09:43 ERROR TaskSchedulerImpl: Lost executor 4 on IP: The executor with id 4 exited with exit code 52(JVM OOM).
The API gave the following container statuses:
container name: spark-kubernetes-executor
container image: spark:3.3.0
container state: terminated
container started at: 2023-09-20T14:01:46Z
container finished at: 2023-09-20T14:09:41Z
exit code: 52
termination reason: Error
23/09/20 14:09:49 ERROR FileFormatWriter: Aborting job 757c5b44-f7c9-436a-874b-295370b1ab97. org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 87 (start at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException
我们使用aws-sdk-java-1.11.0和hadoop-aws-3.3.0。我们在这些jar里是否有任何会导致OOM的线程泄漏?
我们检查了executor的堆转储,并将state作为内存中最大的对象。如何计算需要存储在执行器内存中的状态大小?
请帮我调试一下**
1条答案
按热度按时间taor4pac1#
根据我的经验
maxFilesPerTrigger
的可预测性低于maxBytesPerTrigger
1.即使使用
maxFilesPerTrigger
,200个文件对于3个执行器来说也太多了。我的目标是大约微批处理大小(基于文件或字节)小于sum(executor_memory)/2
的一半。在你的情况下,它应该<= 45 G。如果超过200个文件,则可能会出现问题。不相关的说明-为什么使用
foreachBatch
?在您的情况下,它可以简化为smth,如