请注意,只有在已通过运行以下命令安装spark的情况下,才可以在本地运行此命令。否则,在databricks集群上复制该问题,该集群将自动初始化spark上下文。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
sparkDataframe
spark_dataframe = pd.DataFrame(
{'id' : ['867', '430', '658', '157', '521', '867', '430', '867'],
'Probability':[0.12, 0.72, 0.32, 0.83, 0.12, 0.49, 0.14, 0.12],
'RAG': ['G', 'R', 'A', 'R', 'G', 'A', 'G', 'G'],
'Timestamp': ['2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 16-45-32', '2020-07-01 16-45-32', '2020-07-01 15-45-32']})
spark_dataframe = spark.createDataFrame(spark_dataframe)
我的词典列表
my_list_of_dictionaries=[{'id': '867',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G',
'Probability': 0.13},
{'id': '430',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G',
'Probability': 0.12},
{'id': '658',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G',
'Probability': 0.12},
{'id': '157',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G',
'Probability': 0.12}]
我希望sparkDataframe的每一行都作为类似的字典附加到字典列表中,如果id与列表中已经存在的id匹配,则创建一个列表列表。
我最后的字典清单是这样的,
list_of_list_dictionaries=[ [{'id': '867',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G',
'Probability': 0.13},
{'id': '867',
'Timestamp': '2020-07-01 17-49-32',
'RAG': 'G',
'Probability': 0.12},
{'id': '867',
'Timestamp': '2020-07-01 16-45-32',
'RAG': 'A',
'Probability': 0.49},
{'id': '867',
'Timestamp': '2020-07-01 15-45-32',
'RAG': 'G',
'Probability': 0.12}], #4 occurrences of id '867' since it has 3 rows in my spark df and 1 dictionary already in my list of dictionaries.
[{'id': '430',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G',
'Probability': 0.12},
{'id': '430',
'Timestamp': '2020-07-01 17-49-32',
'RAG': 'R',
'Probability': 0.72},
{'id': '430',
'Timestamp': '2020-07-01 16-45-32',
'RAG': 'G',
'Probability': 0.14}], #3 occurrences of id '430' since it has 2 rows in my spark df and 1 dictionary already in my list of dictionaries.
[{'id': '658',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G',
'Probability': 0.12},
{'id': '658',
'Timestamp': '2020-07-01 17-49-32',
'RAG': 'A',
'Probability': 0.32}],
[{'id': '157',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G',
'Probability': 0.12},
{'id': '157',
'Timestamp': '2020-07-01 17-49-32',
'RAG': 'R',
'Probability': 0.83}],
[{'id': '521',
'Timestamp': '2020-07-01 17-49-32',
'RAG': 'G',
'Probability': 0.12}] ] # 1st occurrence of id '521' since its only 1 time in my spark df only.
所以我的问题是如何用字典创建这个列表(我的第一个问题(基于我的标题)
基于给定的op问题标题,我的第二个问题是如何创建我想要应用list的函数,以便每个list得到1个元素。在介绍函数之前,我希望我的最终列表与最初的字典列表一样,保留所有的id(来自spark df和字典列表(即id'521'))
final_list=[ {'id': '867',
'Timestamp': '2020-07-01 15-45-32',
'RAG': 'G',
{'id': '430',
'Timestamp': '2020-07-01 16-45-32',
'RAG': 'G'},
{'id': '658',
'Timestamp': '2020-07-01 17-49-32',
'RAG': 'A'},
{'id': '157',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'R'},
{'id': '521',
'Timestamp': '2020-07-02 07-27-58',
'RAG': 'G'}] # 1st occurrence of id '521' since its only 1 time in my spark df only.
正如你可以看到字典列表时,通过过滤。在自定义函数中转换筛选,该函数如下所示:
from datetime import datetime, timedelta
final_list=[]
mapping_dictionary={'R':0.6, 'A':0.3, 'G':0.1}
for i in list_of_list_dictionaries:
if len(i)>=2: #like the id '867', '430', '658', '157', we have 4, 3, 2, 2 occurrences of each.
# at this step I want to count the RAG values per (i). So the first id, '867', has 3 'G' values and 1 'A'. And somehow I want the following if else statements to occur
if len('G' values) > len('A') and len('R'): #'G' has the most occurrences in the list of dictionaries of the same id
final_list.append({'id': id of the (i),
'RAG': 'G',
'Timestamp': datetime.utcnow()})
elif len('A' values) > len('G') and len('R') and len('A' values) <3: #'A' has the most occurrences in the list of dictionaries of the same id
final_list.append({'id': id of the (i),
'RAG': 'A',
'Timestamp': datetime.utcnow()})
elif len('A' values) > len('G') and len('R') and len('A' values) >=3: #'A' has the most occurrences in the list of dictionaries of the same id
final_list.append({'id': id of the (i),
'RAG': 'R',
'Timestamp': datetime.utcnow()})
elif len('R' values) > len('G') and len('A'): #'R' has the most occurrences in the list of dictionaries of the same id
final_list.append({'id': id of the (i),
'RAG': 'R',
'Timestamp': datetime.utcnow()})
else:
final_list.append({'id': id of the (i),
'RAG': #IF EACH RAG VALUE HAS 1 ONLY OCCURRENCE, LIKE IN id 157, 1 'G' and 1 'R' value, SELECT FOR RAG VALUE THE ONE WITH THE HIGHEST NUMBER OF IMPORTANCE IN THE mapping_dictionary
'Timestamp': datetime.utcnow()})
else: #the len(i) is 1:
final_list.append({'id': id of the (i),
'RAG': #rag value of (i) since it has only 1 occurrence
'Timestamp': datetime.utcnow()})
提前感谢您的关注,并感谢您对这两个问题的评论。我对任何讨论都持开放态度,希望您能改进过滤功能的工作方式。
暂无答案!
目前还没有任何答案,快来回答吧!