pyspark 3.4.0按行减少耗时太长

bfnvny8b  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(139)

下面的代码说明了这个问题。

import pyspark.sql.functions as F
import time
from datetime import datetime, timedelta
from functools import reduce
from operator import add
from pyspark.sql import SparkSession

master = "local"
executor_memory = "4g"
driver_memory = "4g"

spark = SparkSession.builder.config("spark.master", master)\
    .config("spark.executor.memory", executor_memory)\
    .config("spark.driver.memory", driver_memory)\
    .getOrCreate()

NCOLS = 30

col_names = [f"{i}" for i in range(NCOLS)]
col_vals = [F.lit(f"{i}").alias(f"{i}") for i in range(NCOLS)]
df = spark.createDataFrame([('id',)], schema='id STRING')
df_data = df.select(["*"] + col_vals)

st=time.time()
df_final = df_data.withColumn("row_avg", reduce(add, [F.col(x) for x in col_names]) / NCOLS)
df_final.count()
print(f"processing time is {(time.time()-st)/60}")

这段代码在pyspark 3.4.0和3.4.1上大约需要4 mts(而在3.2.4到3.3.3以及3.5.0上运行得很快。有人能建议这是一个缺陷还是行为上的一些变化吗?如果NCOLS小于25,代码运行得很快。当NCOLS超过25时,处理时间会变得更糟。
我已经检查了pyspark 3.2.4,3.3.3上的行为,代码运行良好,这表明pyspark 3.4.0中的行为可能会有一些变化。也尝试使用sum()重写,仍然看到相同的行为。

ztyzrc3y

ztyzrc3y1#

我可以肯定的是,在PySpark 3.4中,这个脚本需要大约4分钟,而在PySpark 3.5中只需几秒钟
这个看起来像个虫子
我用PySpark 3.4运行了这个脚本。

spark-submit yourScript.py

看起来是CodeGenerator花了4分钟(从07:34:50到07:38:52)

2023-10-18 07:34:50 INFO  ContextHandler:921 - Started o.s.j.s.ServletContextHandler@ec69003{/static/sql,null,AVAILABLE,@Spark}
2023-10-18 07:38:52 INFO  CodeGenerator:60 - Code generated in 767.824343 ms

我将日志级别增加到“trace”,以了解在这段时间内发生了什么,这里是丢失的3分钟(从07:44:34到07:47:43):
2023-10-18 07:44:34 TRACE HeartbeatReceiver:68 - Checking for hosts with no recent heartbeats in HeartbeatReceiver.在HeartbeatReceiver中检查最近没有心跳的主机。
2023-10-18 07:47:43 TRACE BaseSessionStateBuilder$$anon$1:68 -在1次迭代后达到批量UpdateNullability的固定点。

相关问题