我尝试使用pyspark更新dataframe中的几行,该表存在于无服务器SQL池中我确实尝试使用更新命令,但错误是它在无服务器SQL池中不受支持,是否有任何方法可以更新Azure synapse无服务器SQL表中的几行
yeotifhr1#
如果您要更新在无服务器池中使用ADLS数据源创建的表,请按照this无服务器SQL池不支持更新Delta Lake文件。您可以使用无服务器SQL池来查询最新版本的Delta Lake。使用Synapse Analytics中的Apache Spark池更新Delta Lake。使用以下代码从路径读取数据:
df = spark.read.load('abfss://<containerName>@<ADLSName>.dfs.core.windows.net/mycsv2.csv', format='csv',header=True)
使用以下代码根据要求更新数据:
from pyspark.sql.functions import when, col df = df.withColumn('<colName>', when(<condition>, '<updateValue>').otherwise(df.<colName>))
创建ADLS的链接服务。并将 Dataframe 写入ADLS,代码如下:
import pandas as pd pdf = df.toPandas() pdf.to_csv('abfss://<filepath>/mycsv2.csv', storage_options = {'linked_service' : '<linkedserviceName>'}) print("done")
完整编码:
from pyspark.sql.functions import when, col import pandas as pd df = spark.read.load('abfss://rakeshsynapse@rakeshgen2.dfs.core.windows.net/mycsv2.csv', format='csv',header=True) df = df.withColumn('Name', when(col('Id') == 1, 'AE').otherwise(df.Name)) pdf = df.toPandas() pdf.to_csv('abfss://rakeshsynapse/mycsv2.csv', storage_options = {'linked_service' : 'AzureDataLakeStorage1'}) print("done")
将成功更新数据。
1条答案
按热度按时间yeotifhr1#
如果您要更新在无服务器池中使用ADLS数据源创建的表,请按照this无服务器SQL池不支持更新Delta Lake文件。您可以使用无服务器SQL池来查询最新版本的Delta Lake。使用Synapse Analytics中的Apache Spark池更新Delta Lake。
使用以下代码从路径读取数据:
使用以下代码根据要求更新数据:
创建ADLS的链接服务。并将 Dataframe 写入ADLS,代码如下:
完整编码:
将成功更新数据。