Python多处理的速度大大降低,超过了限制

7bsow1i6  于 2023-04-19  发布在  Python
关注(0)|答案(1)|浏览(122)

我使用这个脚本来生成一些测试数据,它似乎工作得很好,直到1 mn记录。超过这一点,脚本慢得像疯了一样。
我尝试使用池异步来分配负载。在fake_workers创建(其中的对象列表)中,代码在高容量时会变慢。希望优化这部分以提高性能。
根据tqdm,在高负载(50 mn)下,它变成1次/秒。否则,对于1 mn,它大约是70次/秒。
你能帮忙找出瓶颈吗?

import concurrent.futures
import pandas as pd
import numpy as np
import math
import random
import uuid
from tqdm import tqdm
from datetime import datetime
import time
from faker import Faker
import multiprocessing as mp

fake = Faker()

def generate_data(number_of_records, match_rate = 10):
    now = str(datetime.now()).replace(" ", "_")  # current date and time

    # determine pool size based on number of available CPUs
    pool_size = mp.cpu_count()

    # divide number of records evenly among processes
    num_per_process = int(number_of_records / pool_size)

    # create worker processes
    pool = mp.Pool(pool_size)

    # generate workers in parallel
    results = [pool.apply_async(make_workers, args=(num_per_process, match_rate)) for _ in range(pool_size)]
    workers = [result.get() for result in results]

    # combine results into a single DataFrame
    worker_df = pd.concat([pd.DataFrame(worker) for worker in workers], ignore_index=True)
    worker_df.to_csv(
        f"generated_data_{match_rate}_matchrate_{number_of_records}_records_{now}.csv", index=False)

    print(worker_df.head())

def make_workers(num, match_rate):
    start_time = time.time()
    unique_number = math.floor(num*((100-match_rate)/100))

    first_name_list = generate_first_names(num, match_rate, 300)
    last_name_list = generate_last_names(num, match_rate, 300)
    email_list = generate_emails(num, match_rate, 300)
    phone_list = generate_phones(num, match_rate, 300)
    address_list = generate_street_addresses(num, match_rate, 300)
    city_list = [fake.city() for _ in range(100)]
    state_list = [fake.state() for _ in range(50)]
    postal_list = [fake.postcode() for _ in range(100)]
    rewards_id_list = [x + 6006640000000000 for x in range(unique_number)]
    name_id_list = [x + 740305232 for x in range(unique_number)]
    employee_property_number_list = [x + 1000 for x in range(unique_number)]
    program_id_list = [x + 25 for x in range(unique_number)]
    reward_parent_id_list = [x + 6006640000000000 for x in range(unique_number)]
    phone_type_list = ["HOME", "OFFICE"]

    print(" end of list creation --- %s seconds ---" % (time.time() - start_time))

    start_time = time.time()

    fake_workers = [{
        "loyalty_id": uuid.uuid4().hex,
        "rewards_id": np.random.choice(rewards_id_list),
        "name_id": np.random.choice(name_id_list),
        "first_name": np.random.choice(first_name_list),
        "middle_initial": random.choice('ABCD'),
        "last_name": np.random.choice(last_name_list),
        "program_id": np.random.choice(program_id_list),
        "emp_property_nbr": np.random.choice(employee_property_number_list),
        "reward_parent_id": np.random.choice(reward_parent_id_list),
        "loyalty_program_id": 1,
        "loyalty_program_desc": "BWR",
        "enrollment_dt": fake.date_time_between(start_date='-30y', end_date='now'),
        "zip_code": np.random.choice(postal_list),
        "country": "UNITED STATES",
        "country_code": "US",
        "address1": np.random.choice(address_list),
        "address2": "",
        "address3": "",
        "address4": "",
        "city": np.random.choice(city_list),
        "state_code": fake.state_abbr(False),
        "state_name": np.random.choice(state_list),
        "email_address": np.random.choice(email_list),
        "phone_nbr": np.random.choice(phone_list),
        "phone_type": np.random.choice(phone_type_list)
    } for _ in tqdm(range(num))]

    print(" end of json --- %s seconds ---" % (time.time() - start_time))
    return fake_workers

def generate_first_names(num, match_rate, group_size):
    num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
    num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
    # lists to randomly assign to workers
    ## unique matching records 
    first_name_list = []
    with tqdm(total=num_iterations_matching_records, desc="Generating unique names") as pbar:
        for x in range(num_iterations_matching_records):
            first_name_list.append(fake.first_name() + str(uuid.uuid4())[:5])
            pbar.update(1)
    ## generate 40% of records 
    matching_first_name_list = []
    with tqdm(total=len(first_name_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
        for x in first_name_list:
            for _ in range(group_size):
                matching_first_name_list.append(x)
                pbar.update(1)
    print(f"total matching list-{len(matching_first_name_list)}")
    ## generate remaining unique records
    unique_first_name_list = []
    with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique names") as pbar:
        for x in range(num_iterations_unique_records):
            unique_first_name_list.append(fake.first_name() + str(uuid.uuid4())[:5])
            pbar.update(1)
    print(f"total unique list-{len(unique_first_name_list)}")
    final_first_name_list = matching_first_name_list + unique_first_name_list
    print(f"final list-{len(final_first_name_list[:num])}")
    shuffled_final_list = final_first_name_list.copy()
    random.shuffle(shuffled_final_list)
    return shuffled_final_list

def generate_last_names(num, match_rate, group_size):
    num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
    num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
    # lists to randomly assign to workers
    ## unique matching records 
    last_name_list = []
    with tqdm(total=num_iterations_matching_records, desc="Generating unique names") as pbar:
        for x in range(num_iterations_matching_records):
            last_name_list.append(fake.last_name() + str(uuid.uuid4())[:5])
            pbar.update(1)
    ## generate 40% of records 
    matching_last_name_list = []
    with tqdm(total=len(last_name_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
        for x in last_name_list:
            for _ in range(group_size):
                matching_last_name_list.append(x)
                pbar.update(1)
    print(f"total matching list-{len(matching_last_name_list)}")
    ## generate remaining unique records
    unique_last_name_list = []
    with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique names") as pbar:
        for x in range(num_iterations_unique_records):
            unique_last_name_list.append(fake.last_name() + str(uuid.uuid4())[:5])
            pbar.update(1)
    print(f"total unique list-{len(unique_last_name_list)}")
    final_last_name_list = matching_last_name_list + unique_last_name_list
    print(f"final list-{len(final_last_name_list[:num])}")
    shuffled_final_list = final_last_name_list.copy()
    random.shuffle(shuffled_final_list)
    return shuffled_final_list

def generate_emails(num, match_rate, group_size):
    num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
    num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
    # lists to randomly assign to workers
    ## unique matching records 
    email_list = []
    with tqdm(total=num_iterations_matching_records, desc="Generating unique emails") as pbar:
        for x in range(num_iterations_matching_records):
            email_list.append(str(uuid.uuid4())[:3]+ fake.email())
            pbar.update(1)
    ## generate 40% of records 
    matching_email_list = []
    with tqdm(total=len(email_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
        for x in email_list:
            for _ in range(group_size):
                matching_email_list.append(x)
                pbar.update(1)
    print(f"total matching list-{len(matching_email_list)}")
    ## generate remaining unique records
    unique_email_list = []
    with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique emails") as pbar:
        for x in range(num_iterations_unique_records):
            unique_email_list.append(str(uuid.uuid4())[:3]+fake.email())
            pbar.update(1)
    print(f"total unique list-{len(unique_email_list)}")
    final_email_list = matching_email_list + unique_email_list
    print(f"final list-{len(final_email_list[:num])}")
    shuffled_final_list = final_email_list.copy()
    random.shuffle(shuffled_final_list)
    return shuffled_final_list

def generate_phones(num, match_rate, group_size):
    num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
    num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
    # lists to randomly assign to workers
    ## unique matching records 
    phone_list = []
    with tqdm(total=num_iterations_matching_records, desc="Generating unique phones") as pbar:
        for x in range(num_iterations_matching_records):
            phone_list.append(fake.phone_number())
            pbar.update(1)
    ## generate 40% of records 
    matching_phone_list = []
    with tqdm(total=len(phone_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
        for x in phone_list:
            for _ in range(group_size):
                matching_phone_list.append(x)
                pbar.update(1)
    print(f"total matching list-{len(matching_phone_list)}")
    ## generate remaining unique records
    unique_phone_list = []
    with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique phones") as pbar:
        for x in range(num_iterations_unique_records):
            unique_phone_list.append(fake.phone_number())
            pbar.update(1)
    print(f"total unique list-{len(unique_phone_list)}")
    final_phone_list = matching_phone_list + unique_phone_list
    print(f"final list-{len(final_phone_list[:num])}")
    shuffled_final_list = final_phone_list.copy()
    random.shuffle(shuffled_final_list)
    return shuffled_final_list

def generate_street_addresses(num, match_rate, group_size):
    num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
    num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
    # lists to randomly assign to workers
    ## unique matching records 
    street_address_list = []
    with tqdm(total=num_iterations_matching_records, desc="Generating unique street_addresses") as pbar:
        for x in range(num_iterations_matching_records):
            street_address_list.append(fake.street_address())
            pbar.update(1)
    ## generate 40% of records 
    matching_street_address_list = []
    with tqdm(total=len(street_address_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
        for x in street_address_list:
            for _ in range(group_size):
                matching_street_address_list.append(x)
                pbar.update(1)
    print(f"total matching list-{len(matching_street_address_list)}")
    ## generate remaining unique records
    unique_street_address_list = []
    with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique street_addresses") as pbar:
        for x in range(num_iterations_unique_records):
            unique_street_address_list.append(fake.street_address())
            pbar.update(1)
    print(f"total unique list-{len(unique_street_address_list)}")
    final_street_address_list = matching_street_address_list + unique_street_address_list
    print(f"final list-{len(final_street_address_list[:num])}")

    shuffled_final_list = final_street_address_list.copy()
    random.shuffle(shuffled_final_list)
    return shuffled_final_list

我尝试使用多处理,但它不工作在高负荷

qv7cva1a

qv7cva1a1#

时间复杂度为O(N^2),所以你可以在O(N)的时间内完成它。
下面是代码的超级简化版本:

for size in range(1000, 10001, 1000):
    l = [i for i in range(size)]
    out = []
    begin = time.time()
    for i in range(size):
        out.append(np.random.choice(l))
    end = time.time()
    print(f"size: {size} duration: {end - begin:.2f}")

这段代码创建了一个包含N个元素的列表,然后使用np.random.choice()对该列表进行N次采样。您可能会认为创建一个包含10000个元素的列表会比创建一个包含1000个元素的列表慢10倍,但事实并非如此。
下面是这段代码的输出:

size: 1000 duration: 0.11
size: 2000 duration: 0.43
size: 3000 duration: 0.94
size: 4000 duration: 1.64
size: 5000 duration: 2.54
size: 6000 duration: 3.63
size: 7000 duration: 4.95
size: 8000 duration: 6.41
size: 9000 duration: 8.08
size: 10000 duration: 9.95

np.random.choice()从一个列表中抽取一个随机元素时,它首先将列表转换为一个numpy数组。这需要复制整个列表。
相比之下,Python函数random.choice()可以从列表中采样,而无需首先复制该列表。

for size in range(1000, 10001, 1000):
    l = [i for i in range(size)]
    out = []
    begin = time.time()
    for i in range(size):
        out.append(random.choice(l))
    end = time.time()
    print(f"size: {size} duration: {end - begin:.2f}")

此代码的输出:

size: 1000 duration: 0.00
size: 2000 duration: 0.00
size: 3000 duration: 0.00
size: 4000 duration: 0.00
size: 5000 duration: 0.00
size: 6000 duration: 0.00
size: 7000 duration: 0.01
size: 8000 duration: 0.01
size: 9000 duration: 0.01
size: 10000 duration: 0.01

通过在代码中将np.random.choice替换为random.choice,我发现速度提高了10倍以上。

注意:如果不使用random.choice(),也可以在循环前进行一次数组转换,例如:

l = np.array(l)
    for i in range(size):
        out.append(np.random.choice(l))

时间复杂度为O(N^2)。

相关问题