我有以下几点建议:
请注意,只有在已通过运行以下命令安装spark的情况下,才可以在本地运行此命令。否则,在databricks集群上复制该问题,该集群将自动初始化spark上下文。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
spark_dataframe = pd.DataFrame({'id' : ['867', '430', '658', '157', '521', '867', '430', '867'],
'Probability':[0.12, 0.72, 0.32, 0.83, 0.12, 0.49, 0.14, 0.12],
'RAG': ['G', 'R', 'A', 'R', 'G', 'A', 'G', 'G'],
'Timestamp': ['2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 16-45-32', '2020-07-01 16-45-32', '2020-07-01 15-45-32']})
spark_dataframe = spark.createDataFrame(spark_dataframe)
现在我想按id对sparkDataframe进行分组,并计算rag列的值,将它们拆分为不同的列。所以就这样吧,
+---+--------------------+-------------+------------+
| id||G(count)|A(count)|R(count)|Timestamp(max) |
+---+--------------------+-------------+------------+
|867| 2| 1| 0|2020-07-01 17-49-32|
|430| 1| 0| 1|2020-07-01 17-49-32|
|658| 0| 1| 0|2020-07-01 17-49-32|
|157| 0| 0| 1|2020-07-01 17-49-32|
|521| 1| 0| 0|2020-07-01 17-49-32|
+---+--------------------+-------------+------------+
根据上面的spark数据框创建一个字典列表,如下所示:
final_list=[]
map_dictionary={"R":0.6, "A":0.3, "G":0.1}
final_list=[{"id": "867", "RAG": "G", "Timestamp": "2020-07-01 17-49-32"}, #because for the id 867 the G column had 2 counts greater than the rest A, R column values on the same row.
{"id": "430", "RAG": "R", "Timestamp": "2020-07-01 17-49-32"} #because G and R had 1 occurrence but R has greater weight based on the map dictionary,...
] #length of the list is equal to 5 since five are the unique rows of the spark df above.
4条答案
按热度按时间lnxxn5zx1#
您可以对它们进行分组和旋转。
结果:
+---+-------+
k2fxgqgv2#
---------+-------+
6qfn3psc3#
---------+-------+
ogq8wdun4#
--
最后,可以删除不相关的列