我收到一个数据集,我需要 join
把它和另一张table放在一起。因此,我想到的最简单的解决方案是为另一个表创建第二个数据集并执行 joinWith
.
def joinFunction(dogs: Dataset[Dog]): Dataset[(Dog, Cat)] = {
val cats: Dataset[Cat] = spark.table("dev_db.cat").as[Cat]
dogs.joinWith(cats, ...)
}
我主要关心的是 spark.table("dev_db.cat")
,因为感觉我们指的是所有 cat
数据为
SELECT * FROM dev_db.cat
然后做一个 join
在后期。或者查询优化器直接执行连接而不引用整个表?有更好的解决办法吗?
2条答案
按热度按时间xxb16uws1#
你需要做一个解释,看看是否使用 predicate 下推。然后你就可以判断你的担忧是否正确。
但是,现在一般来说,如果没有使用复杂的数据类型和/或数据类型不匹配不明显,则会发生下推。您也可以通过简单的createorreplacetempview看到这一点。看到了吗https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/4201913720573284/4413065072037724/latest.html
oknrviil2#
以下是一些针对您的案例的建议:
答。如果你有
where
,filter
,limit
,take
etc操作尝试在加入两个数据集之前应用它们。spark不能关闭这些类型的过滤器,因此您必须自己尽可能减少目标记录的数量。这里是spark优化器的一个极好的信息源。b。尝试将数据集放在同一位置,并使用
repartition
功能。重新分区应该基于参与的密钥join
即:c。尝试使用
broadcast
对于较小的数据集,如果您确定它可以放入内存(或增加spark.broadcast.blockSize
). 这对spark程序的性能有一定的提升,因为它将确保同一节点中两个数据集的共存。如果您不能应用上述任何一项,那么spark无法知道应该排除哪些记录,因此将扫描两个数据集中的所有可用行。