Spark通过覆盖特定分区更新历史数据

zsohkypk  于 2023-04-12  发布在  Apache
关注(0)|答案(2)|浏览(145)

我有历史 Dataframe ,它是由deptno分区
df1:

+-----+------+---------+----+----------+-------+-------+------+
|empno| ename|      job| mgr|  hiredate|    sal|   comm|deptno|
+-----+------+---------+----+----------+-------+-------+------+
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00|    30|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850.00|   null|    30|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00|   0.00|    30|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950.00|   null|    30|
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.00|   null|    20|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975.00|   null|    20|
| 7788| SCOTT|  ANALYST|7566|1982-12-09|3000.00|   null|    20|
| 7876| ADAMS|    CLERK|7788|1983-01-12|1100.00|   null|    20|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450.00|   null|    10|
| 7839|  KING|PRESIDENT|7782|1981-11-17|5000.00|   null|    10|
+-----+------+---------+----+----------+-------+-------+------+

我获取其他 Dataframe 的更新
df2:

+-----+-----+---------+----+----------+-------+----+------+
|empno|ename|      job| mgr|  hiredate|    sal|comm|deptno|
+-----+-----+---------+----+----------+-------+----+------+
| 7839|KING |   Leader|7782|1981-11-17|15000.00|null|    10|
+-----+-----+---------+----+----------+-------+----+------+

现在我想更新旧记录(empno=7839)与新记录(更新记录)通过覆盖只各自的分区(部门=10)。

+-----+------+---------+----+----------+-------+-------+------+
|empno| ename|      job| mgr|  hiredate|    sal|   comm|deptno|
+-----+------+---------+----+----------+-------+-------+------+
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00|    30|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975.00|   null|    20|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950.00|   null|    30|
| 7839| KING |LEADER   |7782|1981-11-17|15000.00|   null|    10|
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.00|   null|    20|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450.00|   null|    10|
| 7876| ADAMS|    CLERK|7788|1983-01-12|1100.00|   null|    20|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00|   0.00|    30|
| 7788| SCOTT|  ANALYST|7566|1982-12-09|3000.00|   null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850.00|   null|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00|    30|
+-----+------+---------+----+----------+-------+-------+------+

我有下面的解决方案,但它覆盖所有分区,而不是覆盖修改的分区。

val df1 = spark.read.orc("data/emp/")
  val finalQry = "SELECT * FROM emp"
  val df2 = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test")
    .option("user", "root")
    .option("dbtable", "(" + finalQry + ") as t")
    .load()

  val projections = df1.schema.fields.map { field =>
    coalesce(df2.col(field.name), df1.col(field.name)).as(field.name)
  }

 val finalDf = df1.join(df2, df1.col("empno") === df2.col("empno"), "fullouter").select(projections: _*) 
 finalDf.write.format("orc").mode("overwrite").partitionBy("deptno").save("data/emp/")

考虑到历史数据量巨大,需要解决方案:找到正确的分区与更新的记录和覆盖它们。

bt1cpqcv

bt1cpqcv1#

你知道哪些分区需要覆盖,如果你生成的 Dataframe 只有关于每个分区的数据“deptno”,你可以通过在路径中指定分区来覆盖每个分区吗?

partition10Df.write.format("orc").mode("overwrite").save("data/emp/deptno=10")
lyr7nygr

lyr7nygr2#

设置以下属性:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

如果未设置此属性,则将删除所有现有分区,只添加新分区。例如:如果现有的表有p1,p2分区,现在添加新的数据与分区p2,p3,那么结果的位置将只有p2和p3。p1将被删除。如果此属性被设置,那么结果的分区将是p1,p2,p3。

相关问题