当我学习spark时,我遇到了使用 partitionColumn
, numPartitions
, lowerBound
, upperBound
.
dataFrame = spark.read.format('jdbc').option('url', 'url).
option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver').\
option('user', user).\
option('password', password).\
option('dbtable', tablename).\
option('partitionColumn', partitionColumn).\
option('numPartitions', 4). \
option('lowerBound', 1).\
option('upperBound', 400000).\
load()
上面的代码在作业启动时节省了大量的读取时间。幸运或不幸的是,在我的上一个项目之前,我从中读取数据的源数据库的每个表都有一个id列,该列具有自动增量值。
在我当前的项目中,我没有像在以前的项目中那样选择列id。虽然在我当前源数据库的所有表中都有整数列,但是数据太大,分区工作不正常。我在分区中看到了巨大的数据倾斜。随着作业的进行,只有一个分区包含数据。
我有一个42gb的大表,其中包含以下列数据类型:
CREATE TABLE [databasename].[dbo].[tablename](
[col1] [bigint] NOT NULL,
[col2] [varchar](50) NOT NULL,
[col3] [varchar](50) NULL,
[col4] [varchar](50) NULL,
[col5] [varchar](20) NOT NULL,
[col6] [varchar](100) NULL,
[col7] [varchar](100) NOT NULL,
[col8] [varchar](50) NOT NULL
.....
[col33] [tinyint] NULL,
[col34] [float] NULL,
[col35] [float] NULL,
[col36] [datetime] NULL
)
我考虑分三步实现分区:
计算datetime列的最小值和最大值: col36
然后将它们转换为历元值(long),得到lowerbound(min datetime)和upperbound(max datetime)
在 dbtable
选项,而不是提供表名,提供一个查询(select allcolumns,cast(datetimecolumn to epochvalue)as dateepoch from table),该查询将有一个额外的列 dateEpoch
.
指定此 dateEpoch
列作为我的分区列。
下面是我的想法:
dataFrame = spark.read.format('jdbc').option('url', 'url).
option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver').\
option('user', user).\
option('password', password).\
option('dbtable', 'select allColumns, cast(datetimeColumn to Epoch) as dateEpoch from sourceTable').\
option('partitionColumn', dateEpoch).\
option('numPartitions', 5). \
option('lowerBound', minDateEpoch).\
option('upperBound', maxDateEpoch).\
load()
我选择了datetime列,因为数据包含百万秒,当转换为epoch时,我可以从中获得更多的唯一性。
这种方法行得通吗?如果没有,在这种情况下我应该怎么做?当我加载一个大表(42gb)时,如果我的源代码中没有可用作数据的列,有人能告诉我如何避免分区中的数据倾斜吗 partitionColumn
?
暂无答案!
目前还没有任何答案,快来回答吧!