pyspark 如何从列表向表中插入值

xe55xuns  于 2023-05-21  发布在  Spark
关注(0)|答案(1)|浏览(106)

我有一个列表和一个表,如下所示,我需要做的是遍历表中项目名称列中的值,并找到列表中可用但表中缺少的项目名称(如果有的话)。然后,我需要将缺少的item_name插入到表中,其中item value列为空值,timestamp列的时间戳与其他时间戳相同。
list_of_tags = [“item_1”,“item_2”,“item_3”,“item_4”,“item_5”,“item_1_a”,“item_1_b”,“item_1_c”,“item_1_d”,“item_1_e”]

|item_name | item_value | timestamp |
|:-------  |:------:| ----------------------------:|
| item_1   | 23.2   | 2023-05-08T20:00:00.000+0000 |
| item_2   | 45.2   | 2023-05-08T20:00:00.000+0000 |
| item_3   | 34.3   | 2023-05-08T20:00:00.000+0000 |
| item_4   | 56.3   | 2023-05-08T20:00:00.000+0000 |
| item_1_a | 23.2   | 2023-05-08T20:00:00.000+0000 |
| item_2_b | 45.2   | 2023-05-08T20:00:00.000+0000 |
| item_3_c | 34.3   | 2023-05-08T20:00:00.000+0000 |
| item_4_d | 56.3   | 2023-05-08T20:00:00.000+0000 |

我想要结果是

|item_name | item_value  | timestamp                    |
|:------   |:------------:| ----------------------------:|
| item_1   | 23.2         | 2023-05-08T20:00:00.000+0000 |
| item_2   | 45.2         | 2023-05-08T20:00:00.000+0000 |
| item_3   | 34.3         | 2023-05-08T20:00:00.000+0000 |
| item_4   | 56.3         | 2023-05-08T20:00:00.000+0000 |
| item_4   | 56.3         | 2023-05-08T20:00:00.000+0000 |
| item_5   | null         | 2023-05-08T20:00:00.000+0000 |
| item_1_a | 23.2         | 2023-05-08T20:00:00.000+0000 |
| item_2_b | 45.2         | 2023-05-08T20:00:00.000+0000 |
| item_3_c | 34.3         | 2023-05-08T20:00:00.000+0000 |
| item_4_d | 56.3         | 2023-05-08T20:00:00.000+0000 |
| item_5_e | null         | 2023-05-08T20:00:00.000+0000 |

如何使用Pyspark实现此操作?
任何帮助都非常感谢

lstz6jyr

lstz6jyr1#

使用unionByName函数获取原始DataFrame。这里是你可以做的

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, to_timestamp, col
from pyspark.sql.types import StringType, DoubleType, TimestampType, StructType, StructField

从上面的库中,您必须导入所需的数据类型和函数。

从给定表创建DataFrame

spark = SparkSession.builder.getOrCreate() 

data = [
    ("item_1", 23.2, "2023-05-08T20:00:00.000+0000"),
    ("item_2", 45.2, "2023-05-08T20:00:00.000+0000"),
    ("item_3", 34.3, "2023-05-08T20:00:00.000+0000"),
    ("item_4", 56.3, "2023-05-08T20:00:00.000+0000"),
    ("item_5", None, "2023-05-08T20:00:00.000+0000"),  
    ("item_1_a", 23.2, "2023-05-08T20:00:00.000+0000"),
    ("item_2_b", 45.2, "2023-05-08T20:00:00.000+0000"),
    ("item_3_c", 34.3, "2023-05-08T20:00:00.000+0000"),
    ("item_4_d", 56.3, "2023-05-08T20:00:00.000+0000"),
    ("item_5_e", None, "2023-05-08T20:00:00.000+0000")  
]

df = spark.createDataFrame(data, ["item_name", "item_value", "timestamp"])

现在将timestamp列转换为TimestampType()

df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"))

从标签列表中创建DataFrame

list_of_tags = [
    "item_1", "item_2", "item_3", "item_4", "item_5",
    "item_1_a", "item_2_b", "item_3_c", "item_4_d", "item_5_e"
]

schema = StructType([
    StructField("item_name", StringType(), nullable=False),
    StructField("item_value", DoubleType(), nullable=True),
    StructField("timestamp", TimestampType(), nullable=False)
])

existing_items = df.select("item_name").distinct().collect()
existing_timestamp = df.select("timestamp").first()[0]

missing_items = [item for item in list_of_tags if item not in [row.item_name for row in existing_items]]

missing_items_df = spark.createDataFrame([(item, None, existing_timestamp) for item in missing_items], schema)

在原始DataFrame中添加缺少的项名称,并替换空值

updated_df = df.unionByName(missing_items_df)
updated_df = updated_df.fillna({"item_value": "null"})

返回更新后的DataFrame

updated_df.display(truncate=False)

相关问题