我正在使用AWS Glue将多个文件从S3移动到RDS示例。每天我都会在S3中获取一个新文件,其中可能包含新数据,但也可能包含我已经保存的一些更新值的记录。如果我多次运行这个作业,我当然会在数据库中得到重复的记录。如果Glue注意到某个字段发生了更改,它将尝试更新该记录,而不是插入多条记录,每条记录都有一个唯一的ID。这可能吗?
ql3eal8s1#
我采用了Yuriy建议的第二种方法。获取现有数据以及新数据,然后进行一些处理以合并它们并以覆写模式写入。下面的代码将帮助您了解如何解决这个问题。
sc = SparkContext() glueContext = GlueContext(sc) #get your source data src_data = create_dynamic_frame.from_catalog(database = src_db, table_name = src_tbl) src_df = src_data.toDF() #get your destination data dst_data = create_dynamic_frame.from_catalog(database = dst_db, table_name = dst_tbl) dst_df = dst_data.toDF() #Now merge two data frames to remove duplicates merged_df = dst_df.union(src_df) #Finally save data to destination with OVERWRITE mode merged_df.write.format('jdbc').options( url = dest_jdbc_url, user = dest_user_name, password = dest_password, dbtable = dest_tbl ).mode("overwrite").save()
ecfsfe2w2#
不幸的是,没有优雅的方式来做这件事与胶水。如果你想写Redshift,你可以使用postactions来实现Redshift合并操作。但是,这对于其他jdbc接收器(afaik)是不可能的。或者,在ETL脚本中,您可以从数据库加载现有数据,以便在保存之前过滤掉现有记录。但是,如果您的DB表很大,那么作业可能需要一段时间来处理它。另一种方法是首先以“覆盖”模式写入暂存表(替换现有暂存数据),然后通过API调用DB以仅将新记录复制到最终表中。
postactions
qhhrdooz3#
我在表中使用了INSERT。...复制钥匙. for UPSERT到Aurora RDS运行mysql引擎。也许这将是一个参考您的用例。我们不能使用JDBC,因为我们目前只支持APPEND、OVERWRITE、ERROR模式。我不确定您使用的RDS数据库引擎,下面是mysql UPSERTS的示例。请看这个参考,我在这里发布了一个使用INSERT INTO TABLE的解决方案。.ON DUPLICATE KEY for mysql:Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array
30byixjq4#
可能会有点慢,但Tharsan的解决方案由于MySQL的限制而引起了写操作的其他问题。我决定在将数据写回数据目录之前过滤掉数据,这反过来又更新了底层数据存储,在我的情况下是MySQL:
source_years = glueContext.create_dynamic_frame.from_catalog( database=database, table_name="source_years", transformation_ctx="source_years", ) source_years = ApplyMapping.apply( frame=source_years, mappings=[ ("YearID", "int", "year_id", "int"), ], transformation_ctx="source_years_transform", ) target_years = glueContext.create_dynamic_frame.from_catalog( database=database, table_name="target_years", transformation_ctx="target_years", ) target_years_list = target_years.toDF().select('year_id').rdd.map(lambda x : x[0]).collect() source_years = source_years.filter( f=lambda x: x['year_id'] not in target_years_list ) glueContext.write_dynamic_frame.from_catalog( frame=source_years, database=database, table_name="target_years", transformation_ctx="target_years", )
4条答案
按热度按时间ql3eal8s1#
我采用了Yuriy建议的第二种方法。获取现有数据以及新数据,然后进行一些处理以合并它们并以覆写模式写入。下面的代码将帮助您了解如何解决这个问题。
ecfsfe2w2#
不幸的是,没有优雅的方式来做这件事与胶水。如果你想写Redshift,你可以使用
postactions
来实现Redshift合并操作。但是,这对于其他jdbc接收器(afaik)是不可能的。或者,在ETL脚本中,您可以从数据库加载现有数据,以便在保存之前过滤掉现有记录。但是,如果您的DB表很大,那么作业可能需要一段时间来处理它。
另一种方法是首先以“覆盖”模式写入暂存表(替换现有暂存数据),然后通过API调用DB以仅将新记录复制到最终表中。
qhhrdooz3#
我在表中使用了INSERT。...复制钥匙. for UPSERT到Aurora RDS运行mysql引擎。也许这将是一个参考您的用例。我们不能使用JDBC,因为我们目前只支持APPEND、OVERWRITE、ERROR模式。
我不确定您使用的RDS数据库引擎,下面是mysql UPSERTS的示例。
请看这个参考,我在这里发布了一个使用INSERT INTO TABLE的解决方案。.ON DUPLICATE KEY for mysql:
Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array
30byixjq4#
可能会有点慢,但Tharsan的解决方案由于MySQL的限制而引起了写操作的其他问题。我决定在将数据写回数据目录之前过滤掉数据,这反过来又更新了底层数据存储,在我的情况下是MySQL: