Pyspark:动态添加一行到最终 Dataframe 中

cetgtptt  于 2022-11-21  发布在  Spark
关注(0)|答案(1)|浏览(216)

我有一个最终的 Dataframe ,格式如下:

  • 产品ID:字符串
  • 产品_货到付款:串
  • 产品名称(_N):串
  • 产品版本:整数
  • 产品线名称:串
  • 语言(_COD):串
  • 产品类型名称:串
  • Load_DAT:整数
  • LoadEnd_DAT:整数
  • 编辑变更_DTT:时间戳记

我想在ID(Product_ID)为-1的 Dataframe 中添加一个新行,在字符串列中插入“Unknown”,并在其余数据类型中设置为“null”,例如:

我创建了以下代码:

id_column = "Product_ID"
df_lessOne = spark.createDataFrame(["-1"], "string").toDF(id_column) #create a new id_column row with -1

appended_df = finalDf.unionByName(df_lessOne, allowMissingColumns=True) #add the rest columns of dataframe with nulls

appended_df_filter = appended_df.filter(""+ id_column + " = '-1'")

columns = [item[0] for item in appended_df_filter.dtypes if item[1].startswith('string')] #select only string columns

# replace string columns with "Unknown" 
for c_na in columns:
    appended_df_filter = (appended_df_filter
                               .filter(""+ id_column + " = '-1'")
                               .withColumn(c_na, lit('Unknown'))
                         )
                          
appended_df = appended_df.filter(""+ id_column + " <> '-1'")

dfs = [appended_df, appended_df_filter]

#add final -1 row to the final dataframe
finalDf = reduce(DataFrame.unionAll, dfs)

display(finalDf)

但不幸的是,效果不太好
我尝试动态地创建它,因为我想在其他 Dataframe 中使用它。我只需要在之后更改id_column。
有人能帮我实现这个目标吗
谢谢你,谢谢你

qco9c6ql

qco9c6ql1#

from pyspark.sql.types import *
from datetime import datetime
import pyspark.sql.functions as F

data2 = [
    ("xp3980","2103","Product_1",1,"PdLine_23","XX1","PNT_1",2,36636,datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S')),
    ("gi9387","2411","Product_2",1,"PdLine_44","YT89","PNT_6",2,35847,datetime.strptime('2021-07-21 7:00:00', '%Y-%m-%d %H:%M:%S'))
  ]

schema = StructType([ \
    StructField("Product_ID",StringType(),True), \
    StructField("Product_COD",StringType(),True), \
    StructField("Product_NAM",StringType(),True), \
    StructField("Product_VER", IntegerType(),True), \
    StructField("ProductLine_NAM", StringType(), True), \
    StructField("Language_COD", StringType(), True), \
    StructField("ProductType_NAM", StringType(), True), \
    StructField("Load_DAT", IntegerType(), True), \
    StructField("LoadEnd_DAT", IntegerType(), True), \
    StructField("edmChange_DTT", TimestampType(), True) \
  ])
 
my_df = spark.createDataFrame(data=data2,schema=schema)

df_res = spark.createDataFrame([(-1,)]).toDF("Product_ID")

for c in my_df.schema:
    if str(c.name) == 'Product_ID':
        continue
    if str(c.dataType) == 'StringType':
        df_res = df_res.withColumn(c.name, F.lit('Unknown'))
    else:
        df_res = df_res.withColumn(c.name, F.lit(None))

my_df.union(df_res).show()

+----------+-----------+-----------+-----------+---------------+------------+---------------+--------+-----------+-------------------+
# |Product_ID|Product_COD|Product_NAM|Product_VER|ProductLine_NAM|Language_COD|ProductType_NAM|Load_DAT|LoadEnd_DAT|      edmChange_DTT|
# +----------+-----------+-----------+-----------+---------------+------------+---------------+--------+-----------+-------------------+
# |    xp3980|       2103|  Product_1|          1|      PdLine_23|         XX1|          PNT_1|       2|      36636|2020-08-20 10:00:00|
# |    gi9387|       2411|  Product_2|          1|      PdLine_44|        YT89|          PNT_6|       2|      35847|2021-07-21 07:00:00|
# |        -1|    Unknown|    Unknown|       null|        Unknown|     Unknown|        Unknown|    null|       null|               null|
# +----------+-----------+-----------+-----------+---------------+------------+---------------+--------+-----------+-------------------+

相关问题