pyspark:如何用列指定重新平衡分区提示

8ehkhllq  于 2023-03-28  发布在  Spark
关注(0)|答案(2)|浏览(135)

如何使用pyspark接口指定带列名的REBALANCE分区提示?
作为一个例子,让我们假设我们有

df = spark.range(10)

以下尝试失败:

>>> df.hint("rebalance", "id").explain()
...
pyspark.sql.utils.AnalysisException: REBALANCE Hint parameter should include columns, but id found

如果不是通过名称,即一个简单的字符串,我如何指定列?

  • 使用别名也不起作用
>>> df.alias("df").hint("rebalance", "df.id").explain()
...
pyspark.sql.utils.AnalysisException: REBALANCE Hint parameter should include columns, but df.id found
  • 使用列引用
>>> import pyspark.sql.functions as F
>>> df.hint("rebalance", F.col("id")).explain()
TypeError: all parameters should be in (<class 'str'>, <class 'list'>, <class 'float'>, <class 'int'>), got Column<'id'> of type <class 'pyspark.sql.column.Column'>

也不会

请注意,指定不带列的重新平衡提示可以正常工作,但这不是我想要的:

>>> df.hint("rebalance").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(4), REBALANCE_PARTITIONS_BY_NONE, [id=#551]
   +- Range (0, 10, step=1, splits=4)
kgsdhlau

kgsdhlau1#

我做了一些研究,这似乎是Spark中的一个bug(或多或少)。列名永远不会转换为org.apache.spark.sql.catalyst.expressions.Expression,因为它们在df.repartition中。
即使在Scala中,传递一个字符串也会导致同样的异常:

scala> df.hint("rebalance", "id")

org.apache.spark.sql.AnalysisException: REBALANCE Hint parameter should include columns, but id found

传递列也是如此(令人惊讶的是):

scala> df.hint("rebalance", $"id")

org.apache.spark.sql.AnalysisException: REBALANCE Hint parameter should include columns, but id found

但是获取列的表达式是有效的:

scala> df.hint("rebalance", $"id".expr)
res10: org.apache.spark.sql.Dataset[Long] = [id: bigint]

我将提出一个关于Spark项目的问题,并将更新这个问题的答案。

qq24tv8q

qq24tv8q2#

下面的工作在Pyspark。

df.hint("rebalance(id)")

相关问题