Python多处理不会停止运行,似乎需要更长时间

fnvucqvd  于 2022-10-23  发布在  Python
关注(0)|答案(2)|浏览(169)

我正在尝试使用多重处理来加速我正在做的一些数据标记,但我注意到这需要更长的时间(实际上我从未看到程序终止)。最初的脚本运行大约需要7个小时,但今天早上我上班时发现,昨天晚上我让它运行后,它仍在运行。

任务概述

Input:
  1) Pandas DataFrame with a column of text
  2) Dictionary that looks like {word: label}.

(Desired) Output:
  Same DataFrame but this time with the positions of the words marked in the text.

Example:
DataFrame:
----------------------------------------------------
  | text
0 | I live in the United States.
----------------------------------------------------

Dict: {'United States': 'country'}

Output DataFrame:
----------------------------------------------------
  | text                         | labels
0 | I live in the United States. | [14, 26]
----------------------------------------------------

为了稍微解释结果,子串'United States'位于文本中的位置14-26。我基本上是遍历DataFrame,进一步遍历字典,然后使用正则表达式标记位置。

我做了什么

<Original Code>

def label_data(df, dict):
    pbar = tqdm(iterable=df.to_dict('records'), total=df.shape[0])
    for idx, row in enumerate(pbar): text = row['text'] spans = []    
        for word, label in label_dict.items():
            for match in re.finditer(word, text):
                start = match.start()
                end = match.end()
                spans.append([start, end])

        row['labels'] = spans
        df.iloc[idx] = row

    return df

<Parallelized Code>
from itertools import product
import multiprocessing as mp
import numpy as np

def label_data_parallel(df, dict):
    num_cores = mp.cpu_count() - 1 pool = mp.Pool(num_cores)

    df_chunks = np.array_split(df, num_cores)
    labeled_dfs = pool.starmap(label_data, \
                               product(df_chunks, [dict] * num_cores))
    df = pd.concat(labeled_dfs, axis=0)

    pool.close()
    pool.join()

    return df

我的代码有什么问题吗?此外,DataFrame有大约200000行,字典有大约3000个键值对。

11dmarpk

11dmarpk1#

你有没有想过另一种算法?
三个想法:
1.不在 Dataframe 上迭代,而是在组合文本中搜索。这种搜索已经被优化和研究了几十年,因此应该非常快,希望在python重新库中得到很好的实现。但是,不要添加因为线条粘在一起而出现的标签,请添加一个giberrish分隔符。我用了“@@@@@”。乱七八糟的分隔符(我承认看起来不太好)可以用一个简单的检查来替换,即匹配发生在行边界上,所以跳过它。
1.所有要搜索的键也可以粘贴到一个正则表达式模式中,然后所有工作都可以通过重新库以更有效的方式完成
1.正则表达式模式可以作为trie进行优化,如下所示:在Python 3中加速数百万个正则表达式替换
E、 g这样:

import nltk
import pandas as pd
import re
import string
import random
from nltk.corpus import words

random.seed(1)

# ------- this is just to create nice test data. Otherwise no need in nltk

nltk.download('words')

all_words = words.words()

data_df = pd.DataFrame(
    [
        ' '.join(random.choices(all_words, k=random.randint(1,20))) for _ in range(200000)
    ], columns = ["text"])

label_keys = {
    random.choice(all_words) for _ in range(3000)    
}

# -------- code starts here

class Trie():
    """Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
    The corresponding Regex should match much faster than a simple Regex union."""

    def __init__(self):
        self.data = {}

    def add(self, word):
        ref = self.data
        for char in word:
            ref[char] = char in ref and ref[char] or {}
            ref = ref[char]
        ref[''] = 1

    def dump(self):
        return self.data

    def quote(self, char):
        return re.escape(char)

    def _pattern(self, pData):
        data = pData
        if "" in data and len(data.keys()) == 1:
            return None

        alt = []
        cc = []
        q = 0
        for char in sorted(data.keys()):
            if isinstance(data[char], dict):
                try:
                    recurse = self._pattern(data[char])
                    alt.append(self.quote(char) + recurse)
                except:
                    cc.append(self.quote(char))
            else:
                q = 1
        cconly = not len(alt) > 0

        if len(cc) > 0:
            if len(cc) == 1:
                alt.append(cc[0])
            else:
                alt.append('[' + ''.join(cc) + ']')

        if len(alt) == 1:
            result = alt[0]
        else:
            result = "(?:" + "|".join(alt) + ")"

        if q:
            if cconly:
                result += "?"
            else:
                result = "(?:%s)?" % result
        return result

    def pattern(self):
        return self._pattern(self.dump())

trie_pattern = Trie()

for label in label_keys:
    trie_pattern.add(re.escape(label))

reg_pattern = trie_pattern.pattern()

list_of_texts = list(data_df.text)

indices = list(map(len,list_of_texts))

all_text = "@@@@@@".join(data_df.text) # @@@@@@ - something of known length you don't expect in the text

all_matches = []
for match_ in re.finditer(reg_pattern, all_text):
    all_matches.append(match_.span())
all_matches.sort(key=lambda x: x[0])

label_l = []
start = 0
all_matches_pointer = 0
indices_pointer = 0
label_l.append([]) 
while True:    
    if all_matches_pointer >= len(all_matches):
        for _ in range(len(label_l),len(data_df)):
            label_l.append( [])
        break
    match_start = all_matches[all_matches_pointer][0]
    match_end = all_matches[all_matches_pointer][1]
    if match_start >= start + indices[indices_pointer]:
        label_l.append([]) 
        start += indices[indices_pointer] + 6 # len("@@@@@@")
        indices_pointer += 1
    else:
        label_l[-1] += [(match_start - start, match_end - start)]
        all_matches_pointer += 1

data_df["labels"] = label_l
data_df

几秒钟内即可获得所需结果:

text    labels
0   overempty stirring asyla butchering Sherrymoor  [(5, 6), (19, 21), (42, 43)]
1   premeditator spindliness bilamellate amidosucc...   [(3, 4), (8, 10), (29, 30), (33, 35), (38, 39)...
2   Radek vivicremation rusot noegenetic Shropshir...   [(13, 14), (14, 16), (50, 52), (76, 78), (88, ...
3   uninstructiveness blintze plunging rowiness fi...   [(58, 59), (87, 88), (109, 110), (122, 124), (...
4   memorialize scruffman   [(0, 1), (2, 3), (18, 19)]
... ... ...
199995  emulsor treatiser   [(1, 2), (11, 13)]
199996  squibling anisandrous incorrespondent vague jo...   [(13, 15), (40, 43), (52, 53), (71, 73), (130,...
199997  proallotment bulletheaded uningenuousness plat...   [(0, 5), (8, 9), (44, 46), (62, 65), (75, 77)]
199998  unantiquatedness sulphohalite oversoftness und...   [(6, 10), (32, 35), (65, 67), (68, 71), (118, ...
199999  lenticulothalamic aerometric plastidium panell...   [(14, 15), (22, 23), (31, 33), (38, 39), (46, ...
200000 rows × 2 columns

所以我专门尝试了你的参数(见代码)。200k行和3000个标签的 Dataframe 。算法在我的m1上只运行3-5秒
尚未解决的问题取决于您的投入:
1.如果标签在一行内重叠怎么办?然后需要添加一个循环,以便每次迭代分别搜索每个标签(并且可以进行多处理)

5anewei6

5anewei62#

可以实现以下几种效率:
1.您可以创建一个优化的单个正则表达式,搜索label_dict中的任何国家,而不是对每个国家执行re.finditer,并将该正则表达式传递给工作函数。考虑到您正在搜索3000个国家,这将大大提高搜索速度。
1.您只需要将字符串数组而不是 Dataframe 数组与前面提到的编译正则表达式一起传递给worker函数。
1.通过创建一个mp.cpu_count() - 1的池大小,将主进程留给处理器使用。然后调用starmap,直到返回所有结果,此时池进程处于空闲状态。相反,您可以使用方法imap,它可以在工作函数返回某个值时立即开始处理结果。但是,主进程所做的处理量可能不足以保证为其指定一个处理器。在下面的代码中,我使用了所有可用的处理器来构建池。但你可以试着把一个留给主进程,看看这是否更有效。
1.worker函数只需要返回它找到的跨度列表。主进程将使用此数据向原始 Dataframe 添加新列。

def label_data(texts, regex):
    return [
        [[match.start(), match.end()] for match in regex.finditer(text)]
        for text in texts
        ]

def label_data_parallel(df, label_dict):
    import multiprocessing as mp
    import numpy as np
    import re
    from functools import partial

    class Trie():
        """Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
        The corresponding Regex should match much faster than a simple Regex union."""

        def __init__(self):
            self.data = {}

        def add(self, word):
            ref = self.data
            for char in word:
                ref[char] = char in ref and ref[char] or {}
                ref = ref[char]
            ref[''] = 1

        def dump(self):
            return self.data

        def quote(self, char):
            return re.escape(char)

        def _pattern(self, pData):
            data = pData
            if "" in data and len(data.keys()) == 1:
                return None

            alt = []
            cc = []
            q = 0
            for char in sorted(data.keys()):
                if isinstance(data[char], dict):
                    try:
                        recurse = self._pattern(data[char])
                        alt.append(self.quote(char) + recurse)
                    except:
                        cc.append(self.quote(char))
                else:
                    q = 1
            cconly = not len(alt) > 0

            if len(cc) > 0:
                if len(cc) == 1:
                    alt.append(cc[0])
                else:
                    alt.append('[' + ''.join(cc) + ']')

            if len(alt) == 1:
                result = alt[0]
            else:
                result = "(?:" + "|".join(alt) + ")"

            if q:
                if cconly:
                    result += "?"
                else:
                    result = "(?:%s)?" % result
            return result

        def pattern(self):
            return self._pattern(self.dump())

    num_cores = mp.cpu_count()

    text_chunks = np.array_split(df['text'].values.tolist(), num_cores)

    trie = Trie()
    for country in label_dict.keys():
        trie.add(country)
    regex = re.compile(trie.pattern())

    pool = mp.Pool(num_cores)

    label_spans = []
    for spans in pool.imap(partial(label_data, regex=regex), text_chunks):
        label_spans.extend(spans)
    pool.close()
    pool.join()

    df['labels'] = label_spans

    return df

def main():
    import pandas as pd

    df = pd.DataFrame({'text': [
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
    ]})

    label_dict = {
        'United States': 'country',
        'France': 'country',
    }

    label_data_parallel(df, label_dict)
    print(df)

if __name__ == '__main__':
    main()

打印:

text              labels
0                        I live in the United States.          [[14, 27]]
1                        I live in the United States.          [[14, 27]]
2                        I live in the United States.          [[14, 27]]
3                        I live in the United States.          [[14, 27]]
4                        I live in the United States.          [[14, 27]]
5                        I live in the United States.          [[14, 27]]
6                        I live in the United States.          [[14, 27]]
7                        I live in the United States.          [[14, 27]]
8   France is where I live But I used to live in t...  [[0, 6], [49, 62]]
9   France is where I live But I used to live in t...  [[0, 6], [49, 62]]
10  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
11  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
12  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
13  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
14  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
15  France is where I live But I used to live in t...  [[0, 6], [49, 62]]

相关问题