pyspark 如何将合并两个write_dynamic_frame.from_options合并到一个事务中?

rvpgvaaj  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(116)

我正在使用Glue和pyspark。我正在像这样写入两个表:

glueContext.write_dynamic_frame.from_options(
            frame=DynamicFrame.fromDF(df1, glueContext, "df1"),
            connection_type="custom.spark",
            connection_options=combinedConf
        )
        

        glueContext.write_dynamic_frame.from_options(
            frame=DynamicFrame.fromDF(df2, glueContext, "df2"),
            connection_type="custom.spark",
            connection_options=combinedConf
        )

字符串
我想把它们合并合并到一个事务中,这样要么两个表都写,要么两个表都不写。我想到了Transactions,但是我怎么把transaction id传递给glueContext.write_dynamic_frame.from_options函数,我检查了文档,但是没有找到。
有没有办法强制这两行代码一起执行或者根本不执行?谢谢
编辑:我这样添加的,但得到了

TypeError: from_options() got an unexpected keyword argument 'transactionId'


tx_id = context.start_transaction(read_only=False)context.write_dynamic_frame.from_options(frame=DynamicFrame.fromDF(df1,gluecontext,“df1”),connection_type=“custom.spark”,connection_options=config1,transactionId=tx_id)

context.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df2, gluecontext, "df2"),
connection_type="custom.spark",
connection_options=config2,
transactionId=tx_id
)
    


try:
    
    context.commit_transaction(tx_id)
    print("done writing after transaction")
except Exception:
    context.cancel_transaction(tx_id)
    print("cancel trans")
    raise Exception("the transaction cancelled")

kupeojn6

kupeojn61#

在additional_options中添加transactionId

相关问题