类Spark操作优化器

2w2cym1i  于 2022-11-16  发布在  Apache
关注(0)|答案(1)|浏览(125)

让我们假设我有以下的Sparkdf's:

df_with_wild_card = spark.createDataFrame(
   [("random_key", "%")],
   ["key", "value"]
)

df_with_long_value_column = spark.createDataFrame(
   [("value1"),
   ("value2"),
   ("value3"),
   ("value4") ... ],
   ["value"]
)

我想把这两个df连接起来

df_with_long_value_column .alias("df_with_long_value_column").join(
            f.broadcast(df_with_wild_card.alias("df_with_wild_card")),
            f.expr("df_with_long_value_column.value like df_with_wild_card.value"),
            "inner",
        )

很明显,因为df_with_wild_card.value只是一个值“%",所以df_with_long_value_column.value中的每个值都应该在连接操作的结果df上。
我问题是:spark实际上是检查df_with_long_value_column.value中的每个值,如果它被“%”捕获,或者spark识别出唯一的值是“%”,它会自动将所有值取到结果df中。

6yt4nkrj

6yt4nkrj1#

让我们看一下执行计划

因此,看起来您的连接被视为与其他连接一样。它是广播的,因为数据集小于10mb,它的nestedLoops是因为它的非equi连接。
是的,第一个数据集中的元素将根据第二个数据集中的值逐个进行检查
我可以想象,在逻辑计划优化期间,这可能在Catalyst中以某种方式进行了优化,但我找不到这样的优化器规则
在这里你可以看到默认规则批量:Spark源代码
在此批处理中,您可以找到一个连接到类似运算符的规则:就像Simplification一样,你可以查看它的代码,但我认为它与你的情况无关

相关问题