pyspark AWS粘合和更新重复数据

xjreopfe  于 2023-04-29  发布在  Spark
关注(0)|答案(4)|浏览(112)

我正在使用AWS Glue将多个文件从S3移动到RDS示例。每天我都会在S3中获取一个新文件,其中可能包含新数据,但也可能包含我已经保存的一些更新值的记录。如果我多次运行这个作业,我当然会在数据库中得到重复的记录。如果Glue注意到某个字段发生了更改,它将尝试更新该记录,而不是插入多条记录,每条记录都有一个唯一的ID。这可能吗?

ql3eal8s

ql3eal8s1#

我采用了Yuriy建议的第二种方法。获取现有数据以及新数据,然后进行一些处理以合并它们并以覆写模式写入。下面的代码将帮助您了解如何解决这个问题。

sc = SparkContext()
glueContext = GlueContext(sc)

#get your source data 
src_data = create_dynamic_frame.from_catalog(database = src_db, table_name = src_tbl)
src_df =  src_data.toDF()

#get your destination data 
dst_data = create_dynamic_frame.from_catalog(database = dst_db, table_name = dst_tbl)
dst_df =  dst_data.toDF()

#Now merge two data frames to remove duplicates
merged_df = dst_df.union(src_df)

#Finally save data to destination with OVERWRITE mode
merged_df.write.format('jdbc').options(   url = dest_jdbc_url, 
                                          user = dest_user_name,
                                          password = dest_password,
                                          dbtable = dest_tbl ).mode("overwrite").save()
ecfsfe2w

ecfsfe2w2#

不幸的是,没有优雅的方式来做这件事与胶水。如果你想写Redshift,你可以使用postactions来实现Redshift合并操作。但是,这对于其他jdbc接收器(afaik)是不可能的。
或者,在ETL脚本中,您可以从数据库加载现有数据,以便在保存之前过滤掉现有记录。但是,如果您的DB表很大,那么作业可能需要一段时间来处理它。
另一种方法是首先以“覆盖”模式写入暂存表(替换现有暂存数据),然后通过API调用DB以仅将新记录复制到最终表中。

qhhrdooz

qhhrdooz3#

我在表中使用了INSERT。...复制钥匙. for UPSERT到Aurora RDS运行mysql引擎。也许这将是一个参考您的用例。我们不能使用JDBC,因为我们目前只支持APPEND、OVERWRITE、ERROR模式。
我不确定您使用的RDS数据库引擎,下面是mysql UPSERTS的示例。
请看这个参考,我在这里发布了一个使用INSERT INTO TABLE的解决方案。.ON DUPLICATE KEY for mysql:
Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array

30byixjq

30byixjq4#

可能会有点慢,但Tharsan的解决方案由于MySQL的限制而引起了写操作的其他问题。我决定在将数据写回数据目录之前过滤掉数据,这反过来又更新了底层数据存储,在我的情况下是MySQL:

source_years = glueContext.create_dynamic_frame.from_catalog(
    database=database,
    table_name="source_years",
    transformation_ctx="source_years",
)

source_years = ApplyMapping.apply(
    frame=source_years,
    mappings=[
        ("YearID", "int", "year_id", "int"),
    ],
    transformation_ctx="source_years_transform",
)

target_years = glueContext.create_dynamic_frame.from_catalog(
    database=database,
    table_name="target_years",
    transformation_ctx="target_years",
)

target_years_list = target_years.toDF().select('year_id').rdd.map(lambda x : x[0]).collect()

source_years = source_years.filter(
    f=lambda x: x['year_id'] not in target_years_list
)

glueContext.write_dynamic_frame.from_catalog(
    frame=source_years,
    database=database,
    table_name="target_years",
    transformation_ctx="target_years",
)

相关问题