我想更新配置单元表中的单个列。
from datetime import timedelta, date
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext
from pyspark.sql import SparkSession
import os
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("job1").enableHiveSupport().getOrCreate()
test1=spark.sql("""
select * from myPartitionedTable
where part1='adqwf' and
part2='avgewg' and
col2='filter_condition'
""")
import pandas as pd
import json
import urllib.parse
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = test1.toPandas()
smvr={}
for index, row in pandasDF.iterrows():
newpr = []
pr = pandasDF.iloc[index]['col1']
if pr:
for p in pr:
newp = p.asDict()
newp['attribute1']=p.attribute1+'_suffix'
newpr.append(newp)
print(newpr[0])
pandasDF.iloc[index]['attribute1'][0]=newpr[0]
print(pandasDF.iloc[index]['attribute1'][0])
break
所以通过上面的片段,我可以看到print(pandasDF.iloc[index]['attribute1'][0])
有更新的值。现在,我怎样才能更新底层的配置单元表呢?我在网上查阅了一些资源,并提出了以下建议
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
这可以工作,但注意,在我的选择查询我有
col2='filter_condition'
所以基本上我不想覆盖整个分区,但只有我选择的行。
上述方法是否可行?
1条答案
按热度按时间wqlqzqxt1#
我 * 认为 * 我理解这个权利,但我无法测试它.让我知道这是如何工作的.
您需要修改代码以使用新值更新
pd.DataFrame
,然后使用SparkSession
的createDataFrame()
方法将更新后的pd.DataFrame
转换回Spark DataFrame
。