python—将spark df的记录附加到字典列表中,然后使用函数将相同id的记录过滤并聚合到同一个字典中

bbmckpt7  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(210)

请注意,只有在已通过运行以下命令安装spark的情况下,才可以在本地运行此命令。否则,在databricks集群上复制该问题,该集群将自动初始化spark上下文。

from pyspark.sql import SparkSession

spark =  SparkSession.builder.appName("test").getOrCreate()

sc = spark.sparkContext

sparkDataframe

spark_dataframe = pd.DataFrame( 
                           {'id' : ['867', '430', '658', '157', '521', '867', '430', '867'],
                            'Probability':[0.12, 0.72, 0.32, 0.83, 0.12, 0.49, 0.14, 0.12], 
                            'RAG': ['G', 'R', 'A', 'R', 'G', 'A', 'G', 'G'],
                            'Timestamp': ['2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 17-49-32', '2020-07-01 16-45-32', '2020-07-01 16-45-32', '2020-07-01 15-45-32']})
spark_dataframe = spark.createDataFrame(spark_dataframe)

我的词典列表

my_list_of_dictionaries=[{'id': '867',
  'Timestamp': '2020-07-02 07-27-58',
  'RAG': 'G',
  'Probability': 0.13},
 {'id': '430',
  'Timestamp': '2020-07-02 07-27-58',
  'RAG': 'G',
  'Probability': 0.12},
 {'id': '658',
  'Timestamp': '2020-07-02 07-27-58',
  'RAG': 'G',
  'Probability': 0.12},
 {'id': '157',
  'Timestamp': '2020-07-02 07-27-58',
  'RAG': 'G',
  'Probability': 0.12}]

我希望sparkDataframe的每一行都作为类似的字典附加到字典列表中,如果id与列表中已经存在的id匹配,则创建一个列表列表。
我最后的字典清单是这样的,

list_of_list_dictionaries=[ [{'id': '867',
    'Timestamp': '2020-07-02 07-27-58',
    'RAG': 'G',
    'Probability': 0.13},
   {'id': '867',
    'Timestamp': '2020-07-01 17-49-32',
    'RAG': 'G',
    'Probability': 0.12},
   {'id': '867',
    'Timestamp': '2020-07-01 16-45-32',
    'RAG': 'A',
    'Probability': 0.49},
   {'id': '867',
    'Timestamp': '2020-07-01 15-45-32',
    'RAG': 'G',
    'Probability': 0.12}], #4 occurrences of id '867' since it has 3 rows in my spark df and 1 dictionary already in my list of dictionaries.
   [{'id': '430',
     'Timestamp': '2020-07-02 07-27-58',
     'RAG': 'G',
     'Probability': 0.12},
    {'id': '430',
     'Timestamp': '2020-07-01 17-49-32',
     'RAG': 'R',
     'Probability': 0.72},
    {'id': '430',
     'Timestamp': '2020-07-01 16-45-32',
     'RAG': 'G',
     'Probability': 0.14}], #3 occurrences of id '430' since it has 2 rows in my spark df and 1 dictionary already in my list of dictionaries.
   [{'id': '658',
     'Timestamp': '2020-07-02 07-27-58',
     'RAG': 'G',
     'Probability': 0.12},
    {'id': '658',
     'Timestamp': '2020-07-01 17-49-32',
     'RAG': 'A',
     'Probability': 0.32}],
    [{'id': '157',
      'Timestamp': '2020-07-02 07-27-58',
      'RAG': 'G',
      'Probability': 0.12},
     {'id': '157',
      'Timestamp': '2020-07-01 17-49-32',
      'RAG': 'R',
      'Probability': 0.83}],
    [{'id': '521',
      'Timestamp': '2020-07-01 17-49-32',
      'RAG': 'G',
      'Probability': 0.12}] ] # 1st occurrence of id '521' since its only 1 time in my spark df only.

所以我的问题是如何用字典创建这个列表(我的第一个问题(基于我的标题)
基于给定的op问题标题,我的第二个问题是如何创建我想要应用list的函数,以便每个list得到1个元素。在介绍函数之前,我希望我的最终列表与最初的字典列表一样,保留所有的id(来自spark df和字典列表(即id'521'))

final_list=[   {'id': '867',
     'Timestamp': '2020-07-01 15-45-32',
     'RAG': 'G',
    {'id': '430',
     'Timestamp': '2020-07-01 16-45-32',
     'RAG': 'G'},
    {'id': '658',
     'Timestamp': '2020-07-01 17-49-32',
     'RAG': 'A'},
    {'id': '157',
     'Timestamp': '2020-07-02 07-27-58',
     'RAG': 'R'},
    {'id': '521',
     'Timestamp': '2020-07-02 07-27-58',
     'RAG': 'G'}] # 1st occurrence of id '521' since its only 1 time in my spark df only.

正如你可以看到字典列表时,通过过滤。在自定义函数中转换筛选,该函数如下所示:

from datetime import datetime, timedelta
final_list=[]
mapping_dictionary={'R':0.6, 'A':0.3, 'G':0.1}

for i in list_of_list_dictionaries:
    if len(i)>=2: #like the id '867', '430', '658', '157', we have 4, 3, 2, 2 occurrences of each.
        # at this step I want to count the RAG values per (i). So the first id, '867', has 3 'G' values and 1 'A'. And somehow I want the following if else statements to occur
        if len('G' values) > len('A') and len('R'): #'G' has the most occurrences in the list of dictionaries of the same id
            final_list.append({'id': id of the (i),
                               'RAG': 'G',
                               'Timestamp': datetime.utcnow()})
        elif len('A' values) > len('G') and len('R') and len('A' values) <3: #'A' has the most occurrences in the list of dictionaries of the same id
            final_list.append({'id': id of the (i),
                               'RAG': 'A',
                               'Timestamp': datetime.utcnow()})
        elif len('A' values) > len('G') and len('R') and len('A' values) >=3: #'A' has the most occurrences in the list of dictionaries of the same id
            final_list.append({'id': id of the (i),
                               'RAG': 'R',
                               'Timestamp': datetime.utcnow()})
        elif len('R' values) > len('G') and len('A'): #'R' has the most occurrences in the list of dictionaries of the same id
            final_list.append({'id': id of the (i),
                               'RAG': 'R',
                               'Timestamp': datetime.utcnow()})
        else:
           final_list.append({'id': id of the (i),
                               'RAG': #IF EACH RAG VALUE HAS 1 ONLY OCCURRENCE, LIKE IN id 157, 1 'G' and 1 'R' value, SELECT FOR RAG VALUE THE ONE WITH THE HIGHEST NUMBER OF IMPORTANCE IN THE mapping_dictionary
                               'Timestamp': datetime.utcnow()})

    else: #the len(i) is 1:
        final_list.append({'id': id of the (i),
                           'RAG': #rag value of (i) since it has only 1 occurrence
                           'Timestamp': datetime.utcnow()})

提前感谢您的关注,并感谢您对这两个问题的评论。我对任何讨论都持开放态度,希望您能改进过滤功能的工作方式。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题