我正在尝试使用多重处理来加速我正在做的一些数据标记,但我注意到这需要更长的时间(实际上我从未看到程序终止)。最初的脚本运行大约需要7个小时,但今天早上我上班时发现,昨天晚上我让它运行后,它仍在运行。
任务概述
Input:
1) Pandas DataFrame with a column of text
2) Dictionary that looks like {word: label}.
(Desired) Output:
Same DataFrame but this time with the positions of the words marked in the text.
Example:
DataFrame:
----------------------------------------------------
| text
0 | I live in the United States.
----------------------------------------------------
Dict: {'United States': 'country'}
Output DataFrame:
----------------------------------------------------
| text | labels
0 | I live in the United States. | [14, 26]
----------------------------------------------------
为了稍微解释结果,子串'United States'
位于文本中的位置14-26。我基本上是遍历DataFrame,进一步遍历字典,然后使用正则表达式标记位置。
我做了什么
<Original Code>
def label_data(df, dict):
pbar = tqdm(iterable=df.to_dict('records'), total=df.shape[0])
for idx, row in enumerate(pbar): text = row['text'] spans = []
for word, label in label_dict.items():
for match in re.finditer(word, text):
start = match.start()
end = match.end()
spans.append([start, end])
row['labels'] = spans
df.iloc[idx] = row
return df
<Parallelized Code>
from itertools import product
import multiprocessing as mp
import numpy as np
def label_data_parallel(df, dict):
num_cores = mp.cpu_count() - 1 pool = mp.Pool(num_cores)
df_chunks = np.array_split(df, num_cores)
labeled_dfs = pool.starmap(label_data, \
product(df_chunks, [dict] * num_cores))
df = pd.concat(labeled_dfs, axis=0)
pool.close()
pool.join()
return df
我的代码有什么问题吗?此外,DataFrame有大约200000行,字典有大约3000个键值对。
2条答案
按热度按时间11dmarpk1#
你有没有想过另一种算法?
三个想法:
1.不在 Dataframe 上迭代,而是在组合文本中搜索。这种搜索已经被优化和研究了几十年,因此应该非常快,希望在python重新库中得到很好的实现。但是,不要添加因为线条粘在一起而出现的标签,请添加一个giberrish分隔符。我用了“@@@@@”。乱七八糟的分隔符(我承认看起来不太好)可以用一个简单的检查来替换,即匹配发生在行边界上,所以跳过它。
1.所有要搜索的键也可以粘贴到一个正则表达式模式中,然后所有工作都可以通过重新库以更有效的方式完成
1.正则表达式模式可以作为trie进行优化,如下所示:在Python 3中加速数百万个正则表达式替换
E、 g这样:
几秒钟内即可获得所需结果:
所以我专门尝试了你的参数(见代码)。200k行和3000个标签的 Dataframe 。算法在我的m1上只运行3-5秒
尚未解决的问题取决于您的投入:
1.如果标签在一行内重叠怎么办?然后需要添加一个循环,以便每次迭代分别搜索每个标签(并且可以进行多处理)
5anewei62#
可以实现以下几种效率:
1.您可以创建一个优化的单个正则表达式,搜索
label_dict
中的任何国家,而不是对每个国家执行re.finditer
,并将该正则表达式传递给工作函数。考虑到您正在搜索3000个国家,这将大大提高搜索速度。1.您只需要将字符串数组而不是 Dataframe 数组与前面提到的编译正则表达式一起传递给worker函数。
1.通过创建一个
mp.cpu_count() - 1
的池大小,将主进程留给处理器使用。然后调用starmap
,直到返回所有结果,此时池进程处于空闲状态。相反,您可以使用方法imap
,它可以在工作函数返回某个值时立即开始处理结果。但是,主进程所做的处理量可能不足以保证为其指定一个处理器。在下面的代码中,我使用了所有可用的处理器来构建池。但你可以试着把一个留给主进程,看看这是否更有效。1.worker函数只需要返回它找到的跨度列表。主进程将使用此数据向原始 Dataframe 添加新列。
打印: