如何在pyspark中更新数据框中的行,表在Azure Synapse的无服务器SQL池中

h7wcgrx3  于 2023-06-28  发布在  Spark
关注(0)|答案(1)|浏览(141)

我尝试使用pyspark更新dataframe中的几行,该表存在于无服务器SQL池中
我确实尝试使用更新命令,但错误是它在无服务器SQL池中不受支持,是否有任何方法可以更新Azure synapse无服务器SQL表中的几行

yeotifhr

yeotifhr1#

如果您要更新在无服务器池中使用ADLS数据源创建的表,请按照this无服务器SQL池不支持更新Delta Lake文件。您可以使用无服务器SQL池来查询最新版本的Delta Lake。使用Synapse Analytics中的Apache Spark池更新Delta Lake。
使用以下代码从路径读取数据:

df = spark.read.load('abfss://<containerName>@<ADLSName>.dfs.core.windows.net/mycsv2.csv', format='csv',header=True)

使用以下代码根据要求更新数据:

from pyspark.sql.functions import when, col
df = df.withColumn('<colName>', when(<condition>, '<updateValue>').otherwise(df.<colName>))

创建ADLS的链接服务。并将 Dataframe 写入ADLS,代码如下:

import pandas as pd
    pdf = df.toPandas()   
    pdf.to_csv('abfss://<filepath>/mycsv2.csv', storage_options = {'linked_service' : '<linkedserviceName>'})
    print("done")

完整编码:

from pyspark.sql.functions import when, col
import pandas as pd

df = spark.read.load('abfss://rakeshsynapse@rakeshgen2.dfs.core.windows.net/mycsv2.csv', format='csv',header=True)
df = df.withColumn('Name', when(col('Id') == 1, 'AE').otherwise(df.Name))
pdf = df.toPandas()
pdf.to_csv('abfss://rakeshsynapse/mycsv2.csv', storage_options = {'linked_service' : 'AzureDataLakeStorage1'})
print("done")

将成功更新数据。

相关问题