假设我们有一个20行的SparkDataFrame。我在执行一些昂贵的计算的每一行上应用了一个pyspark UDF。
def expensive_python_function(df, a, b) -> pd.DataFrame:
return ...
def create_udf(a: Broadcast, b: Broadcast, func: Broadcast) -> Callable:
def my_udf(df: pd.DataFrame) -> pd.DataFrame:
result = func.value(df, a.value, b.value)
result["timestamp"] = datetime.datetime.now()
return result
return my_udf
broadcast_func = sparkContext.broadcast(expensive_python_function)
broadcast_a = sparkContext.broadcast(a)
broadcast_b = sparkContext.broadcast(b)
result = sdf.groupby(*groups).applyInPandas(
create_udf(broadcast_a, broadcast_b, broadcast_func),
schema=schema
)
result.show()
为了说明这一点,Groupby中的每个唯一组都将产生一行的 Dataframe 。
变量a和b由每个执行者使用,并且对所有执行者都是相同的。我正在使用broadcast_a.value
访问my_udf
中的变量。
问题
此操作产生2个分区,因此产生2个任务。这两个任务在单个(相同)执行器上执行。显然这不是我想要的,我想让每个任务在一个独立的执行器上并行运行。
我尝试了什么
我将 Dataframe 重新分区为20个分区,并将其持久存储在内存中。
sdf = sdf.repartition(20).persist()
result = sdf.groupby(*groups).applyInPandas(
create_udf(broadcast_a, broadcast_b, broadcast_func),
schema=schema
)
result.show()
这确实给了我20个分区和20个要完成的任务。然而,在10个执行者中,只有1个仍然处于活动状态。
我试过:
- 将spak.ecutor.cores显式设置为1
- 将stark.sql.Shuffle.Partitions设置为20
我还注意到每个执行器都包含RDD块,这也让我困惑吗?
问题
在我看来,Spark驱动程序正在为我决定,所有作业都可以在一个执行器上运行,从大数据的Angular 来看,这是有意义的。我意识到Spark并不是针对我的用例而设计的,我正在测试与使用像pythonmultiprocessing
这样的东西相比,我是否可以实现加速,以及可以实现什么样的加速。
是否可以强制每个任务在单独的执行器上运行,而不管数据的大小或任务的性质?
我使用的是Python3.9和Spark 3.2.1
1条答案
按热度按时间rqqzpn5f1#
因此,解决方案在于不使用DataFrame API。与RDD合作似乎给了你更多的控制权。
它将返回一个RDD,因此您需要转换为DataFrame。或者使用
collect()
收集以获得结果。当然,我也设置了
spark.default.parallelism = str(len(params))
和spark.executor.instances = str(len(params))
。我认为并行度设置不应该是必需的,因为基本上您在spark.parallelize
中也通过了该设置。希望它能帮助一些人!