在非主键上连接多个Spark Dataframe

aiqt4smr  于 2023-02-19  发布在  Apache
关注(0)|答案(2)|浏览(168)

我正在尝试将1个父级Dataframe与2个子级Dataframe连接。
这是我父母DF的样子
| 人员ID|名字|姓氏|
| - ------|- ------|- ------|
| 1个|美国广播公司|XYZ|
儿童DF 1
| 名字|名字匹配的个人ID|
| - ------|- ------|
| 美国广播公司|[一、十、二十]|
儿童DF2
| 姓氏|姓氏匹配的个人ID|
| - ------|- ------|
| XYZ|[一、四十、七十]|
我想通过FirstName列连接父DF和子DF 1,结果应该通过LastName列与子DF 2连接。
| 人员ID|名字|姓氏|名字个人ID|姓氏个人ID|
| - ------|- ------|- ------|- ------|- ------|
| 1个|美国广播公司|XYZ|[一、十、二十]|[一、四十、七十]|
什么是实现这种结果的好的连接策略?简单的内部连接会导致大量的shuffle写入,并且经常在运行很长时间后作业失败。

一些估计:
  • 父DF中的记录数:三千五百万
  • 子DF1中的记录数:一百五十万
  • 子DF2中的记录数:一百五十万

我会有更多这样的子DF(至少15个),我会将它们与父DF中的不同列连接起来

    • 附加信息:**我的父DF的源是我扫描并创建RDD并写入序列文件的Hbase表。此外,我读取序列文件并创建 Dataframe (以便仅执行一次Hbase扫描)。
ih99xse1

ih99xse11#

对于我来说,这两个较小的数据集是广播连接的好候选对象,您可以尝试使用它并检查结果。Broadcast join
请记住,广播应该谨慎使用,并且只在相对较小的数据集上使用。您的广播数据集将在驱动程序上收集(因此它需要适合其内存),然后传播到其他执行器,因此大小在这里非常重要
如果这里没有广播选项,您可以检查是否有data skew,如果您使用的是Spark 3.X,您可以打开AQE并检查是否有帮助

vwhgwdsa

vwhgwdsa2#

很多时候,我们不能对连接进行超过一定程度的调整,我们必须通过连接来达到预期的结果。
我们可以尝试做的是,根据数据的大小/分区和Spark应用程序可用的执行器内核总数来校准spark.sql.shuffle.partitions配置。默认值是200,在大多数情况下,这被证明是太小了。理想情况下,我建议您将其设置为执行器内核的倍数。例如,如果您有100个执行器,每个执行器有4个内核,请从800开始,并根据Spark UI中的结果进行校准。
希望这有帮助!

相关问题