如何对partitionby和其他键连接的Dataframe进行分区?

m0rkklqb  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(236)

我有很多数据框,其中包含客户数据和不同法律实体的历史记录,简化如下:

Val myData = spark.createDataframe(Seq(
(1, 1, “a lot of Data”, 2010-01-01-10.00.00”),
(1, 1, “a lot of Data”, 2010-01-20-10.31.00”),
(1, 1, “a lot of Data”, 2019-06-16-12.00.00”),
(2, 5, “a lot of Data”, 2010-01-01-10.00.00”),
(2,6, “a lot of Data”, 2010-01-01-10.00.00”),
(3, 7, “a lot of Data”, 2010-01-01-10.00.00”)))
.toDF(“legalentity”, “customernumber”,”anydata”,”changetimestamp”)

这些Dataframe存储为Parquet文件,并具有外部配置单元表。更改时间戳被转换为>valid from<,>valid to<by视图,如下所示

CREATE VIEW myview 
AS SELECT 
Legalentity, customernumber, anydata,
Changetimestamp as valid_from,
Coalesce(lead(changetimestamp) over (PARTITION by legalentity, customernumber ORDER BY changetimestamp ASC), “9999-12-31-00.00.00”) as valid_to

(这是简化了的,里面需要一些时间戳转换)
稍后,dataframes/hive表之间有很多连接。这些Dataframe以这种方式存储:

myDf
.orderBy(col(“legalentity”), col(“customernumber”))
.write
.format(“parquet_format”)
.mode(SaveMode.Append)
.partitionBy(“legalentity”)
.save(outputpath)

出于法律原因,不同法律实体的数据必须存储在不同的hdfs路径中,这是由partitionby子句完成的,该子句为每个法律实体创建一个单独的文件夹。有大有小的法人实体,拥有大量的客户,也有少数客户。洗牌分区的数量是所有合法实体的平均数,这很好。
问题:
没有更多的列可以对Dataframe进行分区:如果我们想通过添加一个包含更多列的重新分区作为partitionby子句来加速所有的Dataframe,比如:

myDf
.orderBy(col(“legalentity”), col(“customernumber”))
.repartition(col(“legalentity”), col(“customernumber”))
.write
.format(“parquet_format”)
.mode(SaveMode.Append)
.partitionBy(“legalentity”)
.save(outputpath)

shuffel分区的数量在每个legal entity文件夹中使用。
导致分区=>合法实体<*>随机分区数<
分区太多,有大小Dataframe/表。所有的Dataframe都得到相同数量的随机分区,因此小Dataframe的分区大小为3MB或更小。如果我们对每个表使用不同数量的分区,使文件大小接近128MB,那么一切都会变慢。
我们每天都会得到新的数据,我们只是附加这些数据,但因此我们不使用无序分区的数量,而是重新分区(1)。有时我们不得不重新加载所有的数据来压缩所有的分区,但是我们的进程并没有因为新的每日数据而减慢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题