让我们假设我有以下的Sparkdf's:
df_with_wild_card = spark.createDataFrame(
[("random_key", "%")],
["key", "value"]
)
df_with_long_value_column = spark.createDataFrame(
[("value1"),
("value2"),
("value3"),
("value4") ... ],
["value"]
)
我想把这两个df连接起来
df_with_long_value_column .alias("df_with_long_value_column").join(
f.broadcast(df_with_wild_card.alias("df_with_wild_card")),
f.expr("df_with_long_value_column.value like df_with_wild_card.value"),
"inner",
)
很明显,因为df_with_wild_card.value只是一个值“%",所以df_with_long_value_column.value中的每个值都应该在连接操作的结果df上。
我问题是:spark实际上是检查df_with_long_value_column.value中的每个值,如果它被“%”捕获,或者spark识别出唯一的值是“%”,它会自动将所有值取到结果df中。
1条答案
按热度按时间6yt4nkrj1#
让我们看一下执行计划
因此,看起来您的连接被视为与其他连接一样。它是广播的,因为数据集小于10mb,它的nestedLoops是因为它的非equi连接。
是的,第一个数据集中的元素将根据第二个数据集中的值逐个进行检查
我可以想象,在逻辑计划优化期间,这可能在Catalyst中以某种方式进行了优化,但我找不到这样的优化器规则
在这里你可以看到默认规则批量:Spark源代码
在此批处理中,您可以找到一个连接到类似运算符的规则:就像Simplification一样,你可以查看它的代码,但我认为它与你的情况无关