spark jdbc过滤边界外的记录

iezvtpos  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(223)

我正在尝试优化一个每日作业,将mysql表中三个月的数据拉到hdfs上的parquet中。他们正在使用 mysqldump 在一个非常有创意的方式,但有一个Spark/hdfs生态系统,所以我想我应该用它来代替。
背景
我这样定义了如何读取数据库:


# time_col column is epoch as an integer

# start_time is beginning of three month period

# end_time is end of three month period

df = session.read \
  .format("jdbc") \
  .option("url", url) \
  .option("driver", driver) \
  .option("dbtable", "table1") \
  .option("user", username) \
  .option("password", password) \
  .option("partitionColumn","time_col") \
  .option("upperBound", end_time) \
  .option("lowerBound", start_time) \
  .option("numPartitions", partitions) \
  .load()

除了第一个和最后一个分区有10亿条我根本不想要的记录之外,这个工作非常好;
为了过滤掉绝大多数表,我更新了 dtable 就像这样

.option("dtable", "(select * from table1 WHERE time_col >= {} and time_col < {}) as table2".format(start_time, end_time))

这种方法奏效了。什么时候 end_time - start_time 小的工作运行得很好,但它不能扩展到3个月。
这是因为现在每个分区的查询都包含一个派生表

EXPLAIN SELECT * FROM (SELECT * From table1 WHERE time_col >=1585780000 AND time_col < 1585866400 ) as table2 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
| id | select_type | table      | type  | possible_keys | key      | key_len | ref  | rows     | Extra       |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
|  1 | PRIMARY     | <derived2> | ALL   | NULL          | NULL     | NULL    | NULL | 25048354 | Using where |
|  2 | DERIVED     | table1     | range | time_col      | time_col | 4       | NULL | 25048354 | Using where |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+

相比之下,下面是我刚刚使用 dtable = "table1" ; 更简单更快

explain SELECT * From table1 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
| id | select_type | table  | type  | possible_keys | key      | key_len | ref  | rows    | Extra       |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
|  1 | SIMPLE      | table1 | range | time_col      | time_col | 4       | NULL | 1097631 | Using where |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+

问题
有没有什么方法可以让我过滤掉外部的数据 upperBound 以及 lowerBound 同时还保留了更简单的查询?比如防止第一个和最后一个分区被运行或者覆盖 dtable 在运行时将子查询替换为 table1 ?
参数
我只有对MySQL5.7上的表的读取权限,不能创建视图或索引
我正在开发spark 3.1,但我相信生产是在spark 2上进行的
是的,我考虑过spark结构化流媒体和其他流媒体选项,但这不是我们目前的方向。

lfapxunr

lfapxunr1#

我发现如果加上where()方法,就可以避免子查询。例子:


# time_col column is epoch as an integer

# start_time is beginning of three month period

# end_time is end of three month period

df = session.read \
  .format("jdbc") \
  .option("url", url) \
  .option("driver", driver) \
  .option("dbtable", "table1") \
  .option("user", username) \
  .option("password", password) \
  .option("partitionColumn","time_col") \
  .option("upperBound", end_time) \
  .option("lowerBound", start_time) \
  .option("numPartitions", partitions) \
  .load()

# This filters out everything outside of boundaries

# without creating a subquery

df.where('time_col >= {} AND time_col < {}'.format(start_time,end_time))

spark能够将子句与分区逻辑创建的子句结合起来。因此,没有子查询和更好的性能。

相关问题