spark数据集连接性能

b5lpy0ml  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(332)

我收到一个数据集,我需要 join 把它和另一张table放在一起。因此,我想到的最简单的解决方案是为另一个表创建第二个数据集并执行 joinWith .

def joinFunction(dogs: Dataset[Dog]): Dataset[(Dog, Cat)] = {
      val cats: Dataset[Cat] = spark.table("dev_db.cat").as[Cat]
      dogs.joinWith(cats, ...)
    }

我主要关心的是 spark.table("dev_db.cat") ,因为感觉我们指的是所有 cat 数据为

SELECT * FROM dev_db.cat

然后做一个 join 在后期。或者查询优化器直接执行连接而不引用整个表?有更好的解决办法吗?

xxb16uws

xxb16uws1#

你需要做一个解释,看看是否使用 predicate 下推。然后你就可以判断你的担忧是否正确。
但是,现在一般来说,如果没有使用复杂的数据类型和/或数据类型不匹配不明显,则会发生下推。您也可以通过简单的createorreplacetempview看到这一点。看到了吗https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/4201913720573284/4413065072037724/latest.html

oknrviil

oknrviil2#

以下是一些针对您的案例的建议:
答。如果你有 where , filter , limit , take etc操作尝试在加入两个数据集之前应用它们。spark不能关闭这些类型的过滤器,因此您必须自己尽可能减少目标记录的数量。这里是spark优化器的一个极好的信息源。
b。尝试将数据集放在同一位置,并使用 repartition 功能。重新分区应该基于参与的密钥 join 即:

dogs.repartition(1024, "key_col1", "key_col2")
dogs.join(cats, Seq("key_col1", "key_col2"), "inner")

c。尝试使用 broadcast 对于较小的数据集,如果您确定它可以放入内存(或增加 spark.broadcast.blockSize ). 这对spark程序的性能有一定的提升,因为它将确保同一节点中两个数据集的共存。
如果您不能应用上述任何一项,那么spark无法知道应该排除哪些记录,因此将扫描两个数据集中的所有可用行。

相关问题