当源中没有支持并行读取的列(不能使用partitioncolumn)时,将一个巨大的表读入spark的最佳方法是什么?

0kjbasz6  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(257)

当我学习spark时,我遇到了使用 partitionColumn , numPartitions , lowerBound , upperBound .

dataFrame = spark.read.format('jdbc').option('url', 'url).
         option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver').\
            option('user', user).\
            option('password', password).\
            option('dbtable', tablename).\
            option('partitionColumn', partitionColumn).\
            option('numPartitions', 4). \
            option('lowerBound', 1).\
            option('upperBound', 400000).\
            load()

上面的代码在作业启动时节省了大量的读取时间。幸运或不幸的是,在我的上一个项目之前,我从中读取数据的源数据库的每个表都有一个id列,该列具有自动增量值。
在我当前的项目中,我没有像在以前的项目中那样选择列id。虽然在我当前源数据库的所有表中都有整数列,但是数据太大,分区工作不正常。我在分区中看到了巨大的数据倾斜。随着作业的进行,只有一个分区包含数据。
我有一个42gb的大表,其中包含以下列数据类型:

CREATE TABLE [databasename].[dbo].[tablename](
    [col1] [bigint] NOT NULL,
    [col2] [varchar](50) NOT NULL,
    [col3] [varchar](50) NULL,
    [col4] [varchar](50) NULL,
    [col5] [varchar](20) NOT NULL,
    [col6] [varchar](100) NULL,
    [col7] [varchar](100) NOT NULL,
    [col8] [varchar](50) NOT NULL
    .....
    [col33] [tinyint] NULL,
    [col34] [float] NULL,
    [col35] [float] NULL,
    [col36] [datetime] NULL
)

我考虑分三步实现分区:
计算datetime列的最小值和最大值: col36 然后将它们转换为历元值(long),得到lowerbound(min datetime)和upperbound(max datetime)
dbtable 选项,而不是提供表名,提供一个查询(select allcolumns,cast(datetimecolumn to epochvalue)as dateepoch from table),该查询将有一个额外的列 dateEpoch .
指定此 dateEpoch 列作为我的分区列。
下面是我的想法:

dataFrame = spark.read.format('jdbc').option('url', 'url).
         option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver').\
            option('user', user).\
            option('password', password).\
            option('dbtable', 'select allColumns, cast(datetimeColumn to Epoch) as dateEpoch from sourceTable').\
            option('partitionColumn', dateEpoch).\
            option('numPartitions', 5). \
            option('lowerBound', minDateEpoch).\
            option('upperBound', maxDateEpoch).\
            load()

我选择了datetime列,因为数据包含百万秒,当转换为epoch时,我可以从中获得更多的唯一性。
这种方法行得通吗?如果没有,在这种情况下我应该怎么做?当我加载一个大表(42gb)时,如果我的源代码中没有可用作数据的列,有人能告诉我如何避免分区中的数据倾斜吗 partitionColumn ?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题