Spark:将每个任务强加给单独的执行器

1l5u6lss  于 2022-10-07  发布在  Spark
关注(0)|答案(1)|浏览(106)

假设我们有一个20行的SparkDataFrame。我在执行一些昂贵的计算的每一行上应用了一个pyspark UDF。

def expensive_python_function(df, a, b) -> pd.DataFrame:
    return ...

def create_udf(a: Broadcast, b: Broadcast, func: Broadcast) -> Callable:
    def my_udf(df: pd.DataFrame) -> pd.DataFrame:
        result = func.value(df, a.value, b.value)
        result["timestamp"] = datetime.datetime.now()
        return result
    return my_udf

broadcast_func = sparkContext.broadcast(expensive_python_function)
broadcast_a = sparkContext.broadcast(a)
broadcast_b = sparkContext.broadcast(b)

result = sdf.groupby(*groups).applyInPandas(
      create_udf(broadcast_a, broadcast_b, broadcast_func), 
      schema=schema
)

result.show()

为了说明这一点,Groupby中的每个唯一组都将产生一行的 Dataframe 。

变量a和b由每个执行者使用,并且对所有执行者都是相同的。我正在使用broadcast_a.value访问my_udf中的变量。

问题

此操作产生2个分区,因此产生2个任务。这两个任务在单个(相同)执行器上执行。显然这不是我想要的,我想让每个任务在一个独立的执行器上并行运行。

我尝试了什么

我将 Dataframe 重新分区为20个分区,并将其持久存储在内存中。

sdf = sdf.repartition(20).persist()

result = sdf.groupby(*groups).applyInPandas(
      create_udf(broadcast_a, broadcast_b, broadcast_func), 
      schema=schema
)

result.show()

这确实给了我20个分区和20个要完成的任务。然而,在10个执行者中,只有1个仍然处于活动状态。

我试过:

  • 将spak.ecutor.cores显式设置为1
  • 将stark.sql.Shuffle.Partitions设置为20

我还注意到每个执行器都包含RDD块,这也让我困惑吗?

问题

在我看来,Spark驱动程序正在为我决定,所有作业都可以在一个执行器上运行,从大数据的Angular 来看,这是有意义的。我意识到Spark并不是针对我的用例而设计的,我正在测试与使用像pythonmultiprocessing这样的东西相比,我是否可以实现加速,以及可以实现什么样的加速。

是否可以强制每个任务在单独的执行器上运行,而不管数据的大小或任务的性质?

我使用的是Python3.9和Spark 3.2.1

rqqzpn5f

rqqzpn5f1#

因此,解决方案在于不使用DataFrame API。与RDD合作似乎给了你更多的控制权。

params = [(1,2), (3,4), (5,6)]

@dataclass
class Task:
   func: Callable
   a: int
   b: int

def run_task(task: Task):
    return task.func(task.a, task.b)

data = spark.parallelize(
   [Task(expensive_python_function, a, b) for a, b in params],
   len(params)]
)

result = data.map(run_task)

它将返回一个RDD,因此您需要转换为DataFrame。或者使用collect()收集以获得结果。

当然,我也设置了spark.default.parallelism = str(len(params))spark.executor.instances = str(len(params))。我认为并行度设置不应该是必需的,因为基本上您在spark.parallelize中也通过了该设置。

希望它能帮助一些人!

相关问题