PySpark:插入或更新一个新的框架

os8fio9y  于 11个月前  发布在  Spark
关注(0)|答案(3)|浏览(112)

我有两个帧,DF 1和DF 2。DF 1是主帧,DF 2是增量帧。来自DF 2的数据应该插入DF 1或用于更新DF 1的数据。
假设DF 1是以下格式:
| ID_no|开始日期|量|天|
| --|--|--|--|
| 1 |2016-01-01 2016-01-01| 4650 | 22 |
| 2 |2016-01-02 2016-01-02| 3130 | 45 |
| 1 |2016-01-03 2016-01-03| 4456 | 22 |
| 2 |2016-01-15 2016-01-15 2016-01-15| 1234 | 45 |
DF 2包含以下内容:
| ID_no|开始日期|量|天|
| --|--|--|--|
| 1 |2016-01-01 2016-01-01| 8650 | 52 |
| 2 |2016-01-02 2016-01-02| 7130 | 65 |
| 1 |2016-01-06 2016-01-06| 3456 | 20 |
| 2 |2016-01-20 2016-01-20| 2345 | 19 |
| 3 |2016-02-02 2016 -02-02| 1345 | 19 |
我需要合并的两个字符串,这样,如果“id_no”和“start date”的DF 2匹配DF 1,它应该被替换在DF 1,如果不匹配,它应该被插入到DF 1。“id_no”不是唯一的。
预期结果:
| ID_no|开始日期|量|天|
| --|--|--|--|
| 1 |2016-01-01 2016-01-01| 8650 | 52 |
| 2 |2016-01-02 2016-01-02| 7130 | 65 |
| 1 |2016-01-03 2016-01-03| 4456 | 22 |
| 2 |2016-01-15 2016-01-15 2016-01-15| 1234 | 45 |
| 1 |2016-01-06 2016-01-06| 3456 | 20 |
| 2 |2016-01-20 2016-01-20| 2345 | 19 |
| 3 |2016-02-02 2016 -02-02| 1345 | 19 |

s6fujrry

s6fujrry1#

您可以在id_nostart_date上连接两个 Dataframe ,然后coalesce连接amountdays列,df2的列先连接:

import pyspark.sql.functions as f

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    f.coalesce('b.amount', 'a.amount').alias('amount'), 
    f.coalesce('b.days', 'a.days').alias('days')
).show()

+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+

字符串
如果你有更多的列:

cols = ['amount', 'days']

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    *(f.coalesce('b.' + col, 'a.' + col).alias(col) for col in cols)
).show()
+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+

w8ntj3qf

w8ntj3qf2#

union和后续的agg将工作。

from pyspark.sql import functions as F

grp_by = {'id_no', 'start_date'}
df = df1.union(df2)
df = df.groupby(*grp_by).agg(*[F.first(c).alias(c) for c in set(df.columns)-grp_by])

df.show()
# +-----+----------+------+----+
# |id_no|start_date|amount|days|
# +-----+----------+------+----+
# |    2|2016-01-02|  3130|  45|
# |    1|2016-01-01|  4650|  22|
# |    1|2016-01-03|  4456|  22|
# |    2|2016-01-15|  1234|  45|
# |    3|2016-02-02|  1345|  19|
# |    1|2016-01-06|  3456|  20|
# |    2|2016-01-20|  2345|  19|
# +-----+----------+------+----+

字符串

acruukt9

acruukt93#

如果你正在使用Databricks Delta Lake表,你可以使用SQL的MERGE INTO
将基于源表的一组更新、插入和删除操作合并到目标Delta表中。
仅Delta Lake表支持此语句。
您只需要创建一个new_id,它是id_nostart_date的联接。

MERGE INTO df1
USING df2
ON df1.new_id = df2.new_id
WHEN MATCHED THEN
  UPDATE SET df1.amount = df2.amount, df1.days = df2.days
WHEN NOT MATCHED
  THEN INSERT *

字符串

相关问题