我有两个字典dict1和dict2,键值对分别是dictionary和dataframes。
dict1 = {"A" : df1,"B" : df2,"C" : df3}
dict2 = {"A" : df4,"B" : df5,"C" : df6}
我想将df1['Last_Name']
的每一行与df4['Last_Name]
进行比较,并使用Levenstien距离最高的字段创建一个新字段df1['Match']
。类似地,df2与df5以及df3与df6。
现在我想要这3个比较并行,我尝试了多处理和concurrent.futures。但不知何故,它不起作用。
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for key1, df1 in dict1.items():
for key2, df2 in dict2.items():
futures.append(executor.submit(add_flag, df1, df2))
for future in concurrent.futures.as_completed(futures):
result = future.result()
key1 = next(iter(filter(lambda x: x in result.columns, dict1.keys())))
dict1[key1] = result
2条答案
按热度按时间zfciruhq1#
所以,这里有几件事:
concurrent.futures.ThreadPoolExecutor
用于I/O;您希望ProcessPoolExecutor
执行绑定到CPU的操作。1.您的分析要求并行完成,可以使用
zip
完成,而不是嵌套循环。注意,
add_flag()
需要返回一个 Dataframe ,其列名与dict1
中的某个键匹配。niwlg2el2#
基本上,你需要做的是定义一个函数,它接收2个 Dataframe 作为参数并进行比较,然后使用future,多线程或任何你喜欢的方式调用这个函数,就像这样:比较Dfs函数(我使用fuzzywuzzy来计算Levenstien距离,但你可以用你的方式来做:
然后你调用函数,例如你可以
import concurrent.futures
并运行以下命令:它将迭代dicts并以异步方式调用函数。