pyspark Pandas Dataframe :如何更新配置单元表中特定行

omqzjyyz  于 2023-04-05  发布在  Spark
关注(0)|答案(1)|浏览(121)

我想更新配置单元表中的单个列。

from datetime import timedelta, date
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext
from pyspark.sql import SparkSession
import os
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("job1").enableHiveSupport().getOrCreate()

test1=spark.sql("""
select * from myPartitionedTable
where part1='adqwf' and 
      part2='avgewg' and
      col2='filter_condition'
""")

import pandas as pd
import json
import urllib.parse
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pandasDF = test1.toPandas()
smvr={}

for index, row in pandasDF.iterrows():
    newpr = []
    pr = pandasDF.iloc[index]['col1']
    if pr:
        
        for p in pr:
            newp = p.asDict()
            newp['attribute1']=p.attribute1+'_suffix'
            newpr.append(newp)
        print(newpr[0])
        pandasDF.iloc[index]['attribute1'][0]=newpr[0]
        print(pandasDF.iloc[index]['attribute1'][0])
        
        break

所以通过上面的片段,我可以看到
print(pandasDF.iloc[index]['attribute1'][0])
有更新的值。现在,我怎样才能更新底层的配置单元表呢?我在网上查阅了一些资源,并提出了以下建议

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

这可以工作,但注意,在我的选择查询我有

col2='filter_condition'

所以基本上我不想覆盖整个分区,但只有我选择的行。
上述方法是否可行?

wqlqzqxt

wqlqzqxt1#

我 * 认为 * 我理解这个权利,但我无法测试它.让我知道这是如何工作的.
您需要修改代码以使用新值更新pd.DataFrame,然后使用SparkSessioncreateDataFrame()方法将更新后的pd.DataFrame转换回Spark DataFrame

from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType

# Define the schema of your DataFrame
schema = StructType([
    StructField("col1", StringType()),
    StructField("attribute1", StringType()),
    StructField("products", StringType()),
    StructField("part1", StringType()),
    StructField("part2", StringType())
])

# Create a new DataFrame with the updated values
updated_df = spark.createDataFrame(
    pandasDF,
    schema=schema
).withColumn("products", col("products").cast("array<struct<attribute1:string,attribute2:string>>"))

# Enable dynamic partition overwrite mode
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

# Overwrite only the selected rows in your Hive table
updated_df.write.mode("overwrite").partitionBy("part1", "part2").format("hive").saveAsTable("myPartitionedTable")

相关问题