在我的案例中我应该使用哪一个PySpark广播函数?(pyspark.sql.functions.broadcast与SparkContext.broadcast)

cotxawn7  于 2023-01-25  发布在  Spark
关注(0)|答案(1)|浏览(371)

在我的例子中,我有一个小的DataFrame small_df和一个大的DataFrame big_df,我想在它们之间执行一个连接-

big_df.join(small_df, 'id', 'left_semi')

为了使过程更高效,我想广播small_df,但我看到有两个选项-

1. big_df.join(pyspark.sql.functions.broadcast(small_df), 'id', 'left_semi')

2. big_df.join(sc.broadcast(small_df).value, 'id', 'left_semi')

正确的方法是什么?它们之间的区别是什么?
另外,如果我理解正确的话,为了使用sc.broadcast,我需要调用collect,对吗?因为否则,我会得到一个关于使用SparkContext的异常,因为它不是来自驱动程序。

pcww981p

pcww981p1#

从pyspark docu

pyspark.sql.functions.broadcast(df)[source]

    Marks a DataFrame as small enough for use in broadcast joins.

Docu用于SC广播

class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None, sock_file=None)[source]

    A broadcast variable created with SparkContext.broadcast(). Access its value through value.

因此,第一个更像是对Catalyst的提示,即应该使用广播连接来连接此df,即使通常算法更喜欢sort-merge连接
如果你想了解更多关于连接提示的信息,这里有一个来自Spark代码的示例注解,它描述了这个行为和其他连接提示

// If it is an equi-join, we first look at the join hints w.r.t. the following order:
      //   1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
      //      have the broadcast hints, choose the smaller side (based on stats) to broadcast.
      //   2. sort merge hint: pick sort merge join if join keys are sortable.
      //   3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
      //      sides have the shuffle hash hints, choose the smaller side (based on stats) as the
      //      build side.
      //   4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
      //
      // If there is no hint or the hints are not applicable, we follow these rules one by one:
      //   1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
      //      is supported. If both sides are small, choose the smaller side (based on stats)
      //      to broadcast.
      //   2. Pick shuffle hash join if one side is small enough to build local hash map, and is
      //      much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
      //   3. Pick sort merge join if the join keys are sortable.
      //   4. Pick cartesian product if join type is inner like.
      //   5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
      //      other choice.

第二个选项创建广播变量,这就是为什么你必须收集它(因为它需要是值而不是 Dataframe )有趣的是,当你使用广播加入你的df也将被收集到驱动程序,因为这是广播现在在Spark中的工作方式
在您的情况下,您应该使用pyspark.sql.functions.broadcast(df)。当您希望在每个执行器中提供一些变量的副本以供其他处理时(例如,如果您希望在每个执行器上有一些小的元数据),而不是在连接期间使用pyspark. broadcast,因此最好使用hint

相关问题