我有一个如下格式的Pandas数据框。数据已经预先聚合。
+---------------------------+----------+---------+-------------+-------------+
|InterfaceName |StartDate |StartHour|DocumentCount|TotalRowCount|
+---------------------------+----------+---------+-------------+-------------+
|Interface_A |2023-04-01|0 |5 |4384 |
|Interface_A |2023-04-01|1 |58 |57168 |
|Interface_B |2023-04-01|1 |1 |136 |
|Interface_C |2023-04-01|1 |1 |131 |
|Interface_A |2023-04-02|0 |58 |57168 |
|Interface_B |2023-04-02|0 |1 |131 |
|Interface_C |2023-04-02|0 |1 |136 |
|Interface_A |2023-04-02|1 |2 |1657 |
|Interface_B |2023-04-02|1 |2 |1539 |
|Interface_C |2023-04-02|1 |2 |1657 |
+---------------------------+----------+---------+-------------+-------------+
使用PySpark,我如何转换 Dataframe ,使模式显示如下,然后写入MongoDb中的结构化集合?
root
|-- StartDate: date (nullable = true)
|-- StartHour: integer (nullable = true)
| |-- InterfaceSummary: struct (nullable = false)
| | |-- InterfaceName: string (nullable = true)
| | |-- DocumentCount: string (nullable = true)
| | |-- TotalRowCount: string (nullable = true)
先谢谢你,
班
1条答案
按热度按时间mdfafbf11#
参见下面的实现-
(我已经直接使用你共享的输入数据创建了spark数据框)。但是为了从
pandas
数据框显式创建spark数据框,你可以使用以下代码-df = spark.createDataFrame(pdf)
在这里,
pdf
将是您的pandas
Dataframe 。)
输入数据-
转换架构-
创建转换后的dataframe后,您可以将其写入目标mongodb集合,如下所示-