下面的代码说明了这个问题。
import pyspark.sql.functions as F
import time
from datetime import datetime, timedelta
from functools import reduce
from operator import add
from pyspark.sql import SparkSession
master = "local"
executor_memory = "4g"
driver_memory = "4g"
spark = SparkSession.builder.config("spark.master", master)\
.config("spark.executor.memory", executor_memory)\
.config("spark.driver.memory", driver_memory)\
.getOrCreate()
NCOLS = 30
col_names = [f"{i}" for i in range(NCOLS)]
col_vals = [F.lit(f"{i}").alias(f"{i}") for i in range(NCOLS)]
df = spark.createDataFrame([('id',)], schema='id STRING')
df_data = df.select(["*"] + col_vals)
st=time.time()
df_final = df_data.withColumn("row_avg", reduce(add, [F.col(x) for x in col_names]) / NCOLS)
df_final.count()
print(f"processing time is {(time.time()-st)/60}")
这段代码在pyspark 3.4.0和3.4.1上大约需要4 mts(而在3.2.4到3.3.3以及3.5.0上运行得很快。有人能建议这是一个缺陷还是行为上的一些变化吗?如果NCOLS小于25,代码运行得很快。当NCOLS超过25时,处理时间会变得更糟。
我已经检查了pyspark 3.2.4,3.3.3上的行为,代码运行良好,这表明pyspark 3.4.0中的行为可能会有一些变化。也尝试使用sum()重写,仍然看到相同的行为。
1条答案
按热度按时间ztyzrc3y1#
我可以肯定的是,在PySpark 3.4中,这个脚本需要大约4分钟,而在PySpark 3.5中只需几秒钟
这个看起来像个虫子
我用PySpark 3.4运行了这个脚本。
看起来是
CodeGenerator
花了4分钟(从07:34:50到07:38:52)我将日志级别增加到“trace”,以了解在这段时间内发生了什么,这里是丢失的3分钟(从07:44:34到07:47:43):
2023-10-18 07:44:34 TRACE HeartbeatReceiver:68 - Checking for hosts with no recent heartbeats in HeartbeatReceiver.在HeartbeatReceiver中检查最近没有心跳的主机。
2023-10-18 07:47:43 TRACE BaseSessionStateBuilder$$anon$1:68 -在1次迭代后达到批量UpdateNullability的固定点。