我使用这个脚本来生成一些测试数据,它似乎工作得很好,直到1 mn记录。超过这一点,脚本慢得像疯了一样。
我尝试使用池异步来分配负载。在fake_workers创建(其中的对象列表)中,代码在高容量时会变慢。希望优化这部分以提高性能。
根据tqdm,在高负载(50 mn)下,它变成1次/秒。否则,对于1 mn,它大约是70次/秒。
你能帮忙找出瓶颈吗?
import concurrent.futures
import pandas as pd
import numpy as np
import math
import random
import uuid
from tqdm import tqdm
from datetime import datetime
import time
from faker import Faker
import multiprocessing as mp
fake = Faker()
def generate_data(number_of_records, match_rate = 10):
now = str(datetime.now()).replace(" ", "_") # current date and time
# determine pool size based on number of available CPUs
pool_size = mp.cpu_count()
# divide number of records evenly among processes
num_per_process = int(number_of_records / pool_size)
# create worker processes
pool = mp.Pool(pool_size)
# generate workers in parallel
results = [pool.apply_async(make_workers, args=(num_per_process, match_rate)) for _ in range(pool_size)]
workers = [result.get() for result in results]
# combine results into a single DataFrame
worker_df = pd.concat([pd.DataFrame(worker) for worker in workers], ignore_index=True)
worker_df.to_csv(
f"generated_data_{match_rate}_matchrate_{number_of_records}_records_{now}.csv", index=False)
print(worker_df.head())
def make_workers(num, match_rate):
start_time = time.time()
unique_number = math.floor(num*((100-match_rate)/100))
first_name_list = generate_first_names(num, match_rate, 300)
last_name_list = generate_last_names(num, match_rate, 300)
email_list = generate_emails(num, match_rate, 300)
phone_list = generate_phones(num, match_rate, 300)
address_list = generate_street_addresses(num, match_rate, 300)
city_list = [fake.city() for _ in range(100)]
state_list = [fake.state() for _ in range(50)]
postal_list = [fake.postcode() for _ in range(100)]
rewards_id_list = [x + 6006640000000000 for x in range(unique_number)]
name_id_list = [x + 740305232 for x in range(unique_number)]
employee_property_number_list = [x + 1000 for x in range(unique_number)]
program_id_list = [x + 25 for x in range(unique_number)]
reward_parent_id_list = [x + 6006640000000000 for x in range(unique_number)]
phone_type_list = ["HOME", "OFFICE"]
print(" end of list creation --- %s seconds ---" % (time.time() - start_time))
start_time = time.time()
fake_workers = [{
"loyalty_id": uuid.uuid4().hex,
"rewards_id": np.random.choice(rewards_id_list),
"name_id": np.random.choice(name_id_list),
"first_name": np.random.choice(first_name_list),
"middle_initial": random.choice('ABCD'),
"last_name": np.random.choice(last_name_list),
"program_id": np.random.choice(program_id_list),
"emp_property_nbr": np.random.choice(employee_property_number_list),
"reward_parent_id": np.random.choice(reward_parent_id_list),
"loyalty_program_id": 1,
"loyalty_program_desc": "BWR",
"enrollment_dt": fake.date_time_between(start_date='-30y', end_date='now'),
"zip_code": np.random.choice(postal_list),
"country": "UNITED STATES",
"country_code": "US",
"address1": np.random.choice(address_list),
"address2": "",
"address3": "",
"address4": "",
"city": np.random.choice(city_list),
"state_code": fake.state_abbr(False),
"state_name": np.random.choice(state_list),
"email_address": np.random.choice(email_list),
"phone_nbr": np.random.choice(phone_list),
"phone_type": np.random.choice(phone_type_list)
} for _ in tqdm(range(num))]
print(" end of json --- %s seconds ---" % (time.time() - start_time))
return fake_workers
def generate_first_names(num, match_rate, group_size):
num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
# lists to randomly assign to workers
## unique matching records
first_name_list = []
with tqdm(total=num_iterations_matching_records, desc="Generating unique names") as pbar:
for x in range(num_iterations_matching_records):
first_name_list.append(fake.first_name() + str(uuid.uuid4())[:5])
pbar.update(1)
## generate 40% of records
matching_first_name_list = []
with tqdm(total=len(first_name_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
for x in first_name_list:
for _ in range(group_size):
matching_first_name_list.append(x)
pbar.update(1)
print(f"total matching list-{len(matching_first_name_list)}")
## generate remaining unique records
unique_first_name_list = []
with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique names") as pbar:
for x in range(num_iterations_unique_records):
unique_first_name_list.append(fake.first_name() + str(uuid.uuid4())[:5])
pbar.update(1)
print(f"total unique list-{len(unique_first_name_list)}")
final_first_name_list = matching_first_name_list + unique_first_name_list
print(f"final list-{len(final_first_name_list[:num])}")
shuffled_final_list = final_first_name_list.copy()
random.shuffle(shuffled_final_list)
return shuffled_final_list
def generate_last_names(num, match_rate, group_size):
num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
# lists to randomly assign to workers
## unique matching records
last_name_list = []
with tqdm(total=num_iterations_matching_records, desc="Generating unique names") as pbar:
for x in range(num_iterations_matching_records):
last_name_list.append(fake.last_name() + str(uuid.uuid4())[:5])
pbar.update(1)
## generate 40% of records
matching_last_name_list = []
with tqdm(total=len(last_name_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
for x in last_name_list:
for _ in range(group_size):
matching_last_name_list.append(x)
pbar.update(1)
print(f"total matching list-{len(matching_last_name_list)}")
## generate remaining unique records
unique_last_name_list = []
with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique names") as pbar:
for x in range(num_iterations_unique_records):
unique_last_name_list.append(fake.last_name() + str(uuid.uuid4())[:5])
pbar.update(1)
print(f"total unique list-{len(unique_last_name_list)}")
final_last_name_list = matching_last_name_list + unique_last_name_list
print(f"final list-{len(final_last_name_list[:num])}")
shuffled_final_list = final_last_name_list.copy()
random.shuffle(shuffled_final_list)
return shuffled_final_list
def generate_emails(num, match_rate, group_size):
num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
# lists to randomly assign to workers
## unique matching records
email_list = []
with tqdm(total=num_iterations_matching_records, desc="Generating unique emails") as pbar:
for x in range(num_iterations_matching_records):
email_list.append(str(uuid.uuid4())[:3]+ fake.email())
pbar.update(1)
## generate 40% of records
matching_email_list = []
with tqdm(total=len(email_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
for x in email_list:
for _ in range(group_size):
matching_email_list.append(x)
pbar.update(1)
print(f"total matching list-{len(matching_email_list)}")
## generate remaining unique records
unique_email_list = []
with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique emails") as pbar:
for x in range(num_iterations_unique_records):
unique_email_list.append(str(uuid.uuid4())[:3]+fake.email())
pbar.update(1)
print(f"total unique list-{len(unique_email_list)}")
final_email_list = matching_email_list + unique_email_list
print(f"final list-{len(final_email_list[:num])}")
shuffled_final_list = final_email_list.copy()
random.shuffle(shuffled_final_list)
return shuffled_final_list
def generate_phones(num, match_rate, group_size):
num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
# lists to randomly assign to workers
## unique matching records
phone_list = []
with tqdm(total=num_iterations_matching_records, desc="Generating unique phones") as pbar:
for x in range(num_iterations_matching_records):
phone_list.append(fake.phone_number())
pbar.update(1)
## generate 40% of records
matching_phone_list = []
with tqdm(total=len(phone_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
for x in phone_list:
for _ in range(group_size):
matching_phone_list.append(x)
pbar.update(1)
print(f"total matching list-{len(matching_phone_list)}")
## generate remaining unique records
unique_phone_list = []
with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique phones") as pbar:
for x in range(num_iterations_unique_records):
unique_phone_list.append(fake.phone_number())
pbar.update(1)
print(f"total unique list-{len(unique_phone_list)}")
final_phone_list = matching_phone_list + unique_phone_list
print(f"final list-{len(final_phone_list[:num])}")
shuffled_final_list = final_phone_list.copy()
random.shuffle(shuffled_final_list)
return shuffled_final_list
def generate_street_addresses(num, match_rate, group_size):
num_iterations_matching_records = math.ceil((num*(match_rate/100))/group_size)
num_iterations_unique_records = math.ceil((num*(100-match_rate)/100))
# lists to randomly assign to workers
## unique matching records
street_address_list = []
with tqdm(total=num_iterations_matching_records, desc="Generating unique street_addresses") as pbar:
for x in range(num_iterations_matching_records):
street_address_list.append(fake.street_address())
pbar.update(1)
## generate 40% of records
matching_street_address_list = []
with tqdm(total=len(street_address_list)*group_size, desc=f"Generating matching records with match group size {group_size}") as pbar:
for x in street_address_list:
for _ in range(group_size):
matching_street_address_list.append(x)
pbar.update(1)
print(f"total matching list-{len(matching_street_address_list)}")
## generate remaining unique records
unique_street_address_list = []
with tqdm(total=num_iterations_unique_records, desc=f"Generating remaining {num_iterations_unique_records} unique street_addresses") as pbar:
for x in range(num_iterations_unique_records):
unique_street_address_list.append(fake.street_address())
pbar.update(1)
print(f"total unique list-{len(unique_street_address_list)}")
final_street_address_list = matching_street_address_list + unique_street_address_list
print(f"final list-{len(final_street_address_list[:num])}")
shuffled_final_list = final_street_address_list.copy()
random.shuffle(shuffled_final_list)
return shuffled_final_list
我尝试使用多处理,但它不工作在高负荷
1条答案
按热度按时间qv7cva1a1#
时间复杂度为O(N^2),所以你可以在O(N)的时间内完成它。
下面是代码的超级简化版本:
这段代码创建了一个包含N个元素的列表,然后使用
np.random.choice()
对该列表进行N次采样。您可能会认为创建一个包含10000个元素的列表会比创建一个包含1000个元素的列表慢10倍,但事实并非如此。下面是这段代码的输出:
当
np.random.choice()
从一个列表中抽取一个随机元素时,它首先将列表转换为一个numpy数组。这需要复制整个列表。相比之下,Python函数
random.choice()
可以从列表中采样,而无需首先复制该列表。此代码的输出:
通过在代码中将
np.random.choice
替换为random.choice
,我发现速度提高了10倍以上。注意:如果不使用
random.choice()
,也可以在循环前进行一次数组转换,例如:时间复杂度为O(N^2)。