我想使用spark执行更新和插入操作,请查找现有表的图像引用
这里我更新id:101 location和inserttime并插入2条记录:
并使用模式覆盖写入目标
df.write.format("jdbc")
.option("url", "jdbc:mysql://localhost/test")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable","temptgtUpdate")
.option("user", "root")
.option("password", "root")
.option("truncate","true")
.mode("overwrite")
.save()
执行上述命令后,我的数据被损坏,插入到数据库表
数据库中的数据
你能告诉我你的意见和解决办法吗
5条答案
按热度按时间toiithl61#
Spark JDBC writer支持以下模式:
由于您使用的是“覆盖”模式,它会根据列长度重新创建表,如果您想要自己的表定义,请先创建表,然后使用“追加”模式
m1m5dgzv2#
我想使用spark执行更新和插入操作
在Spark SQL中没有与SQL
UPDATE
语句等效的语句。Spark SQL中也没有与SQLDELETE WHERE
语句等效的语句。相反,您必须在Spark外部删除需要更新的行,然后使用append
模式将包含新记录和更新记录的Spark框架写入表中(以便保留表中剩余的现有行)。xxe27gdn3#
如果你需要在你的pyspark代码中执行UPSERT / DELETE操作,我建议你使用pymysql库,并执行你的upsert/delete操作。请查看这篇文章以获取更多信息,并参考代码示例:Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array
请根据您的需要修改代码示例。
3b6akqbq4#
我不推荐TRUNCATE,因为它实际上会删除表,并创建新表。在这样做的时候,表可能会丢失之前设置的列级属性.所以在使用TRUNCATE时要小心,并且要确定是否可以删除表/重新创建表。
mzaanser5#
按照以下步骤操作时,Upsert逻辑工作正常
做这部
尽管如此,我还是无法理解为什么当我直接使用 Dataframe 编写时它会失败