我正在使用Glue和pyspark。我正在像这样写入两个表:
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df1, glueContext, "df1"),
connection_type="custom.spark",
connection_options=combinedConf
)
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df2, glueContext, "df2"),
connection_type="custom.spark",
connection_options=combinedConf
)
字符串
我想把它们合并合并到一个事务中,这样要么两个表都写,要么两个表都不写。我想到了Transactions
,但是我怎么把transaction id
传递给glueContext.write_dynamic_frame.from_options
函数,我检查了文档,但是没有找到。
有没有办法强制这两行代码一起执行或者根本不执行?谢谢
编辑:我这样添加的,但得到了
TypeError: from_options() got an unexpected keyword argument 'transactionId'
型
tx_id = context.start_transaction(read_only=False)context.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(df1,gluecontext,“df1”),connection_type=“custom.spark”,connection_options=config1,transactionId=tx_id)
context.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df2, gluecontext, "df2"),
connection_type="custom.spark",
connection_options=config2,
transactionId=tx_id
)
try:
context.commit_transaction(tx_id)
print("done writing after transaction")
except Exception:
context.cancel_transaction(tx_id)
print("cancel trans")
raise Exception("the transaction cancelled")
型
1条答案
按热度按时间kupeojn61#
在additional_options中添加transactionId