PySpark / Mongodb Dataframe到嵌套集合

2ledvvac  于 2023-04-05  发布在  Spark
关注(0)|答案(1)|浏览(144)

我有一个如下格式的Pandas数据框。数据已经预先聚合。

+---------------------------+----------+---------+-------------+-------------+
|InterfaceName              |StartDate |StartHour|DocumentCount|TotalRowCount|
+---------------------------+----------+---------+-------------+-------------+
|Interface_A                |2023-04-01|0        |5            |4384         |
|Interface_A                |2023-04-01|1        |58           |57168        |
|Interface_B                |2023-04-01|1        |1            |136          |
|Interface_C                |2023-04-01|1        |1            |131          |
|Interface_A                |2023-04-02|0        |58           |57168        |
|Interface_B                |2023-04-02|0        |1            |131          |
|Interface_C                |2023-04-02|0        |1            |136          |
|Interface_A                |2023-04-02|1        |2            |1657         |
|Interface_B                |2023-04-02|1        |2            |1539         |
|Interface_C                |2023-04-02|1        |2            |1657         |
+---------------------------+----------+---------+-------------+-------------+

使用PySpark,我如何转换 Dataframe ,使模式显示如下,然后写入MongoDb中的结构化集合?

root
 |-- StartDate: date (nullable = true)
 |-- StartHour: integer (nullable = true)
 |    |-- InterfaceSummary: struct (nullable = false)
 |    |    |-- InterfaceName: string (nullable = true)
 |    |    |-- DocumentCount: string (nullable = true)
 |    |    |-- TotalRowCount: string (nullable = true)

先谢谢你,

mdfafbf1

mdfafbf11#

参见下面的实现-
(我已经直接使用你共享的输入数据创建了spark数据框)。但是为了从pandas数据框显式创建spark数据框,你可以使用以下代码-
df = spark.createDataFrame(pdf)
在这里,pdf将是您的pandas Dataframe 。

输入数据-

from pyspark.sql.types import *

schema = StructType([
    StructField("InterfaceName", StringType(), True),
    StructField("StartDate", StringType(), True),
    StructField("StartHour", IntegerType(), True),
    StructField("DocumentCount", IntegerType(), True),
    StructField("TotalRowCount", IntegerType(), True)
])

data = [
    ("Interface_A", "2023-04-01", 0, 5, 4384),
    ("Interface_A", "2023-04-01", 1, 58, 57168),
    ("Interface_B", "2023-04-01", 1, 1, 136),
    ("Interface_C", "2023-04-01", 1, 1, 131),
    ("Interface_A", "2023-04-02", 0, 58, 57168),
    ("Interface_B", "2023-04-02", 0, 1, 131),
    ("Interface_C", "2023-04-02", 0, 1, 136),
    ("Interface_A", "2023-04-02", 1, 2, 1657),
    ("Interface_B", "2023-04-02", 1, 2, 1539),
    ("Interface_C", "2023-04-02", 1, 2, 1657)
]

df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)

+-------------+----------+---------+-------------+-------------+
|InterfaceName|StartDate |StartHour|DocumentCount|TotalRowCount|
+-------------+----------+---------+-------------+-------------+
|Interface_A  |2023-04-01|0        |5            |4384         |
|Interface_A  |2023-04-01|1        |58           |57168        |
|Interface_B  |2023-04-01|1        |1            |136          |
|Interface_C  |2023-04-01|1        |1            |131          |
|Interface_A  |2023-04-02|0        |58           |57168        |
|Interface_B  |2023-04-02|0        |1            |131          |
|Interface_C  |2023-04-02|0        |1            |136          |
|Interface_A  |2023-04-02|1        |2            |1657         |
|Interface_B  |2023-04-02|1        |2            |1539         |
|Interface_C  |2023-04-02|1        |2            |1657         |
+-------------+----------+---------+-------------+-------------+

转换架构-

from pyspark.sql.functions import *

df1 = df.select(
                col("StartDate").cast("Date"),
                col("StartHour").cast("Integer"),
                struct(
                  col("InterfaceName"),
                  col("DocumentCount").cast("String").alias("DocumentCount"),
                  col("TotalRowCount").cast("String").alias("TotalRowCount")
                ).alias("InterfaceSummary")
)
df1.show(truncate=False)
df1.printSchema()

+----------+---------+------------------------+
|StartDate |StartHour|InterfaceSummary        |
+----------+---------+------------------------+
|2023-04-01|0        |{Interface_A, 5, 4384}  |
|2023-04-01|1        |{Interface_A, 58, 57168}|
|2023-04-01|1        |{Interface_B, 1, 136}   |
|2023-04-01|1        |{Interface_C, 1, 131}   |
|2023-04-02|0        |{Interface_A, 58, 57168}|
|2023-04-02|0        |{Interface_B, 1, 131}   |
|2023-04-02|0        |{Interface_C, 1, 136}   |
|2023-04-02|1        |{Interface_A, 2, 1657}  |
|2023-04-02|1        |{Interface_B, 2, 1539}  |
|2023-04-02|1        |{Interface_C, 2, 1657}  |
+----------+---------+------------------------+

root
 |-- StartDate: date (nullable = true)
 |-- StartHour: integer (nullable = true)
 |-- InterfaceSummary: struct (nullable = false)
 |    |-- InterfaceName: string (nullable = true)
 |    |-- DocumentCount: string (nullable = true)
 |    |-- TotalRowCount: string (nullable = true)

创建转换后的dataframe后,您可以将其写入目标mongodb集合,如下所示-

mongo_uri = "<mongodb>://<username>:<password>@<host>:<port>/<dbname>.<collectionname>"
database_name = "<dbname>"
collection_name = "<collectionname>"

df.write.format("mongo") \
  .option("uri", mongo_uri) \
  .option("database", database_name) \
  .option("collection", collection_name) \
  .mode("append") \
  .save()

相关问题