我有历史 Dataframe ,它是由deptno分区
df1:
+-----+------+---------+----+----------+-------+-------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+------+---------+----+----------+-------+-------+------+
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00| 30|
| 7521| WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00| 30|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00| 30|
| 7698| BLAKE| MANAGER|7839|1981-05-01|2850.00| null| 30|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00| 0.00| 30|
| 7900| JAMES| CLERK|7698|1981-12-03| 950.00| null| 30|
| 7369| SMITH| CLERK|7902|1980-12-17| 800.00| null| 20|
| 7566| JONES| MANAGER|7839|1981-04-02|2975.00| null| 20|
| 7788| SCOTT| ANALYST|7566|1982-12-09|3000.00| null| 20|
| 7876| ADAMS| CLERK|7788|1983-01-12|1100.00| null| 20|
| 7782| CLARK| MANAGER|7839|1981-06-09|2450.00| null| 10|
| 7839| KING|PRESIDENT|7782|1981-11-17|5000.00| null| 10|
+-----+------+---------+----+----------+-------+-------+------+
我获取其他 Dataframe 的更新
df2:
+-----+-----+---------+----+----------+-------+----+------+
|empno|ename| job| mgr| hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+-------+----+------+
| 7839|KING | Leader|7782|1981-11-17|15000.00|null| 10|
+-----+-----+---------+----+----------+-------+----+------+
现在我想更新旧记录(empno=7839)与新记录(更新记录)通过覆盖只各自的分区(部门=10)。
+-----+------+---------+----+----------+-------+-------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+------+---------+----+----------+-------+-------+------+
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00| 30|
| 7566| JONES| MANAGER|7839|1981-04-02|2975.00| null| 20|
| 7900| JAMES| CLERK|7698|1981-12-03| 950.00| null| 30|
| 7839| KING |LEADER |7782|1981-11-17|15000.00| null| 10|
| 7369| SMITH| CLERK|7902|1980-12-17| 800.00| null| 20|
| 7782| CLARK| MANAGER|7839|1981-06-09|2450.00| null| 10|
| 7876| ADAMS| CLERK|7788|1983-01-12|1100.00| null| 20|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00| 0.00| 30|
| 7788| SCOTT| ANALYST|7566|1982-12-09|3000.00| null| 20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00| 30|
| 7698| BLAKE| MANAGER|7839|1981-05-01|2850.00| null| 30|
| 7521| WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00| 30|
+-----+------+---------+----+----------+-------+-------+------+
我有下面的解决方案,但它覆盖所有分区,而不是覆盖修改的分区。
val df1 = spark.read.orc("data/emp/")
val finalQry = "SELECT * FROM emp"
val df2 = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test")
.option("user", "root")
.option("dbtable", "(" + finalQry + ") as t")
.load()
val projections = df1.schema.fields.map { field =>
coalesce(df2.col(field.name), df1.col(field.name)).as(field.name)
}
val finalDf = df1.join(df2, df1.col("empno") === df2.col("empno"), "fullouter").select(projections: _*)
finalDf.write.format("orc").mode("overwrite").partitionBy("deptno").save("data/emp/")
考虑到历史数据量巨大,需要解决方案:找到正确的分区与更新的记录和覆盖它们。
2条答案
按热度按时间bt1cpqcv1#
你知道哪些分区需要覆盖,如果你生成的 Dataframe 只有关于每个分区的数据“deptno”,你可以通过在路径中指定分区来覆盖每个分区吗?
lyr7nygr2#
设置以下属性:
如果未设置此属性,则将删除所有现有分区,只添加新分区。例如:如果现有的表有p1,p2分区,现在添加新的数据与分区p2,p3,那么结果的位置将只有p2和p3。p1将被删除。如果此属性被设置,那么结果的分区将是p1,p2,p3。