如何在pyspark中运行insert、update或delete查询?

vi4fp9gy  于 2021-05-16  发布在  Spark
关注(0)|答案(2)|浏览(888)

我需要像这样运行sql表达式:

df.write.option('query',"INSERT INTO dbo.Cities(Name,Population) VALUES('Rome',5000)").save()
j2cgzkjk

j2cgzkjk1#

请注意,sparksql不支持在insert期间指定列。必须按正确顺序指定每列中的所有值。

spark.sql("create schema if not exists dbo")
spark.sql("create table if not exists dbo.Cities (Name string, Population int)")
spark.sql("INSERT INTO dbo.Cities VALUES('Rome',5000)")
spark.sql("SELECT * FROM dbo.Cities").show()
+----+----------+
|Name|Population|
+----+----------+
|Rome|      5000|
+----+----------+
mnemlml8

mnemlml82#

使用 deltalake 为了这个案子。
创建delta表,然后我们可以对表执行更新/删除/插入。
如果要插入到表中,则可以使用 spark.sql 向表中插入数据。

spark.sql("INSERT INTO dbo.Cities(Name,Population) VALUES('Rome',5000)")

但对于更新/删除,您需要使用 delta lake (或) insert overwrite/deleting partitions 来自spark sql的语句。

相关问题