pandas 如何在 Dataframe 之间执行并行处理?

fumotvh3  于 2023-05-27  发布在  其他
关注(0)|答案(2)|浏览(215)

我有两个字典dict1和dict2,键值对分别是dictionary和dataframes。

dict1 = {"A" : df1,"B" : df2,"C" : df3}
dict2 = {"A" : df4,"B" : df5,"C" : df6}

我想将df1['Last_Name']的每一行与df4['Last_Name]进行比较,并使用Levenstien距离最高的字段创建一个新字段df1['Match']。类似地,df2与df5以及df3与df6。
现在我想要这3个比较并行,我尝试了多处理和concurrent.futures。但不知何故,它不起作用。

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        for key2, df2 in dict2.items():
            futures.append(executor.submit(add_flag, df1, df2))

    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
        dict1[key1] = result
zfciruhq

zfciruhq1#

所以,这里有几件事:

  1. concurrent.futures.ThreadPoolExecutor用于I/O;您希望ProcessPoolExecutor执行绑定到CPU的操作。
    1.您的分析要求并行完成,可以使用zip完成,而不是嵌套循环。
    注意,add_flag()需要返回一个 Dataframe ,其列名与dict1中的某个键匹配。
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        df2 = dict2[key1]
        futures.append(executor.submit(add_flag, df1, df2))

    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
        dict1[key1] = result
niwlg2el

niwlg2el2#

基本上,你需要做的是定义一个函数,它接收2个 Dataframe 作为参数并进行比较,然后使用future,多线程或任何你喜欢的方式调用这个函数,就像这样:比较Dfs函数(我使用fuzzywuzzy来计算Levenstien距离,但你可以用你的方式来做:

def add_flag(df1, df2):
    for index, row in df1.iterrows():
        best_match = None
        best_distance = 0
        for _, row2 in df2.iterrows():
            distance = fuzz.ratio(row['Last_Name'], row2['Last_Name'])
            if distance > best_distance:
                best_match = row2
                best_distance = distance

        if best_match is not None:
            df1.loc[index, 'Match'] = best_match['Last_Name']

    return df1

然后你调用函数,例如你可以import concurrent.futures并运行以下命令:

dict1 = {"A": df1, "B": df2, "C": df3}
dict2 = {"A": df4, "B": df5, "C": df6}

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for key1, df1 in dict1.items():
        df2 = dict2[key1]
        futures.append(executor.submit(add_flag, df1, df2))

    for future, (key1, df1) in zip(concurrent.futures.as_completed(futures), dict1.items()):
        result = future.result()
        dict1[key1] = result

它将迭代dicts并以异步方式调用函数。

相关问题