我使用Python编写代码来执行我的团队的一项常规任务。简而言之,我们有两个CSV用于管理和更新系统中的用户帐户。csv包含最新的数据,而XYZ.csv取自系统本身的帐户数据。
代码的目的是比较两个CSV之间的数据。当它发现XYS.csv与ABC.csv有不同的数据时,它会生成更新的CSV供我们上传到系统中。
当我使用4-5个测试记录时,代码运行得很好。但是,我们通常处理超过20,000条记录。当我尝试使用这些数据集运行代码时,它会连续运行,而不会产生错误或结果。
我甚至让它运行了一个多小时,但仍然没有什么。没有错误没有结果
我尝试使用Modin来加快处理速度,但它似乎也在连续运行,而不会产生任何结果或错误。
我的问题是:
- 这正常吗?我的数据集是不是太大了,需要几个小时才能运行?
- 我的代码是否需要优化,或者需要一个可以更快工作的不同库?
- 两个CSV之间的数据非常不对称。csv有更多的记录,因为它包含不打算更新的帐户(我们没有办法轻易排除我们不需要的帐户)。会不会是数据中的某些东西导致了阻塞?
import pandas as pd
import concurrent.futures
from fuzzywuzzy import fuzz
def fuzzy_match(s1, s2, threshold=85):
return fuzz.ratio(s1, s2) >= threshold
def match_rows(index, xyz_row):
match = abc[abc["Emp No"] == xyz_row["Employee"]]
abc_row = None
if not match.empty:
abc_row = match.iloc[0]
else:
# Fuzzy matching using First Name, Last Name, and Email
fuzzy_matched_rows = abc.apply(lambda x: fuzzy_match(str(xyz_row["First Name"]) + " " + str(xyz_row["Last Name"]), str(x["First Name"]) + " " + str(x["Last Name"])) and (pd.isna(xyz_row["Email"]) or fuzzy_match(str(xyz_row["Email"]), str(x["Email"]))), axis=1)
abc_fuzzy_matched = abc[fuzzy_matched_rows]
if not abc_fuzzy_matched.empty:
abc_row = abc_fuzzy_matched.iloc[0]
return xyz_row, abc_row
abc = pd.read_csv("abc.csv", low_memory=False)
xyz = pd.read_csv("xyz.csv", low_memory=False)
def is_data_different(xyz_row, abc_row):
if xyz_row["First Name"] != abc_row["First Name"]:
return True
if xyz_row["Last Name"] != abc_row["Last Name"]:
return True
if xyz_row["Employee"] != abc_row["Emp No"]:
return True
if xyz_row["User's Organisation Name(s)"] != abc_row["Org L13"]:
return True
if xyz_row["User's Job Title(s)"] != abc_row["Poslevel2"]:
return True
if xyz_row["Assignment Status"] != abc_row["Assignment Status"]:
return True
if not pd.isna(xyz_row["Email"]) and xyz_row["Email"] != abc_row["Email"]:
return True
# Add more field comparisons as needed
return False
usermatch_columns = ["idnumber", "timemodified", "username", "deleted", "firstname", "lastname", "email", "town", "auth", "customfield_hierarchypath", "customfield_orgframework", "customfield_occcode", "customfield_posno", "customfield_title", "customfield_middlename", "customfield_fte", "customfield_employee", "customfield_payscale", "customfield_assignmentstartdate", "customfield_areaofwork", "customfield_maidenname", "customfield_assignmentstatus", "customfield_ProfessionalNumber", "customfield_TrainingGrade", "customfield_PatientHandler", "customfield_WorkswithChildren", "customfield_Clinical", "customfield_Frontline"]
jobassign_columns = ["idnumber", "useridenumber", "timemodified", "deleted", "fullname", "orgidnumber", "posidnumber", 'manageridnumber']
usermatch = pd.DataFrame(columns=usermatch_columns)
jobassign = pd.DataFrame(columns=jobassign_columns)
manager_username = None
# Add userupload_columns and userupload DataFrame
userupload_columns = ["username", "firstname", "lastname", "email", "auth", "idnumber"]
userupload = pd.DataFrame(columns=userupload_columns)
manager_idnumber_dict = dict(zip(abc["Emp No"], abc["Manager Employee Number"]))
with concurrent.futures.ThreadPoolExecutor() as executor:
matched_rows = list(executor.map(match_rows, *zip(*xyz.iterrows())))
for xyz_row, abc_row in matched_rows:
if abc_row is not None:
# Check if User ID Number is empty, and use the capitalized version of the username if it is
if pd.isna(xyz_row["User ID Number"]) or xyz_row["User ID Number"] == "":
xyz_row["User ID Number"] = xyz_row["Username"].title()
manager_id = abc_row["Manager Employee Number"]
manager_username = xyz.loc[xyz["Employee"] == manager_id, "Username"]
manager_username = manager_username.iloc[0] if not manager_username.empty else None
if is_data_different(xyz_row, abc_row):
# Populate usermatch.csv
usermatch_row = {
"idnumber": xyz_row["User ID Number"],
"timemodified": 0,
"username": xyz_row["Username"],
"deleted": 0,
"firstname": abc_row["First Name"],
"lastname": abc_row["Last Name"],
"email": xyz_row["Email"],
"town": abc_row["City"],
"auth": "manual",
"customfield_hierarchypath": f'{abc_row["Org L3"]}~{abc_row["Org Level 3"]}~{abc_row["Org L6"]}~{abc_row["Org L13"]}',
"customfield_orgframework": "UHSussex",
"customfield_occcode": abc_row["Occ Code"],
"customfield_posno": abc_row["Position No"],
"customfield_title": abc_row["Title"],
"customfield_middlename": abc_row["Middle Name"],
"customfield_fte": abc_row["FTE"],
"customfield_employee": abc_row["Emp No"],
"customfield_payscale": abc_row["Pay Scale"],
"customfield_assignmentstartdate": abc_row["Assign Start Date"],
"customfield_areaofwork": abc_row["Area Of Work"],
"customfield_maidenname": abc_row["Maiden Name"],
"customfield_assignmentstatus": abc_row["Assignment Status"],
"customfield_ProfessionalNumber": abc_row["Professional Registration Num"],
"customfield_TrainingGrade": "",
"customfield_PatientHandler": 0,
"customfield_WorkswithChildren": 0,
"customfield_Clinical": 0,
"customfield_Frontline": 0
}
usermatch = pd.concat([usermatch, pd.DataFrame([usermatch_row], columns=usermatch_columns)], ignore_index=True)
# Populate jobassign.csv
jobassign_row = {
"idnumber": abc_row["Emp No"],
"useridenumber": xyz_row["User ID Number"],
"timemodified": 0,
"deleted": 0,
"fullname": abc_row["Position Title"],
"orgidnumber": abc_row["Org L13"],
"posidnumber": abc_row["Poslevel2"],
"manageridnumber": manager_username
}
jobassign = pd.concat([jobassign, pd.DataFrame([jobassign_row], columns=jobassign_columns)], ignore_index=True)
# Populate userupload.csv
userupload_row = {
"username": xyz_row["Username"],
"firstname": abc_row["First Name"],
"lastname": abc_row["Last Name"],
"email": xyz_row["Email"],
"auth": "manual",
"idnumber": xyz_row["User ID Number"],
}
userupload = pd.concat([userupload, pd.DataFrame([userupload_row], columns=userupload_columns)], ignore_index=True)
def has_fuzzy_match(abc_row):
for _, xyz_row in xyz.iterrows():
if fuzzy_match(abc_row["First Name"] + " " + abc_row["Last Name"], xyz_row["First Name"] + " " + xyz_row["Last Name"]) and (pd.isna(abc_row["Email"]) or fuzzy_match(abc_row["Email"], xyz_row["Email"])):
return True
return False
# Step 1: Create a new DataFrame called newusers with the same columns as userupload.csv
newusers_columns = ["username", "firstname", "lastname", "email", "auth", "idnumber"]
newusers = pd.DataFrame(columns=newusers_columns)
# Step 2: Iterate through the rows of abc.csv and check if each row exists in xyz.csv
for _, abc_row in abc.iterrows():
if abc_row["Emp No"] not in xyz["Employee"].values and not has_fuzzy_match(abc_row):
# Step 3: Generate a username from the email address and populate the newusers.csv file
username = abc_row["Email"].split('@')[0] if pd.notna(abc_row["Email"]) else ""
newusers_row = {
"username": username,
"firstname": abc_row["First Name"],
"lastname": abc_row["Last Name"],
"email": abc_row["Email"],
"auth": "manual",
"idnumber": username.title(),
}
newusers = pd.concat([newusers, pd.DataFrame([newusers_row], columns=userupload_columns)], ignore_index=True)
# Add new user data to usermatch and jobassign DataFrames
manager_id = abc_row["Manager Employee Number"]
manager_username = xyz.loc[xyz["Employee"] == manager_id, "Username"]
manager_username = manager_username.iloc[0] if not manager_username.empty else None
usermatch_row = {
"idnumber": username.title(),
"timemodified": 0,
"username": xyz_row["Username"],
"deleted": 0,
"firstname": abc_row["First Name"],
"lastname": abc_row["Last Name"],
"email": xyz_row["Email"],
"town": abc_row["City"],
"auth": "manual",
"customfield_hierarchypath": f'{abc_row["Org L3"]}~{abc_row["Org Level 3"]}~{abc_row["Org L6"]}~{abc_row["Org L13"]}',
"customfield_orgframework": "UHSussex",
"customfield_occcode": abc_row["Occ Code"],
"customfield_posno": abc_row["Position No"],
"customfield_title": abc_row["Title"],
"customfield_middlename": abc_row["Middle Name"],
"customfield_fte": abc_row["FTE"],
"customfield_employee": abc_row["Emp No"],
"customfield_payscale": abc_row["Pay Scale"],
"customfield_assignmentstartdate": abc_row["Assign Start Date"],
"customfield_areaofwork": abc_row["Area Of Work"],
"customfield_maidenname": abc_row["Maiden Name"],
"customfield_assignmentstatus": abc_row["Assignment Status"],
"customfield_ProfessionalNumber": abc_row["Professional Registration Num"],
"customfield_TrainingGrade": "",
"customfield_PatientHandler": 0,
"customfield_WorkswithChildren": 0,
"customfield_Clinical": 0,
"customfield_Frontline": 0
}
usermatch = pd.concat([usermatch, pd.DataFrame([usermatch_row], columns=usermatch_columns)], ignore_index=True)
jobassign_row = {
"idnumber": abc_row["Emp No"],
"useridenumber": username.title(),
"timemodified": 0,
"deleted": 0,
"fullname": abc_row["Position Title"],
"orgidnumber": abc_row["Org L13"],
"posidnumber": abc_row["Poslevel2"],
# other fields from the original code
"manageridnumber": manager_username
}
jobassign = pd.concat([jobassign, pd.DataFrame([jobassign_row], columns=jobassign_columns)], ignore_index=True)
# Save the output CSVs
usermatch.to_csv("usermatch.csv", index=False)
jobassign.to_csv("jobassign.csv", index=False)
userupload.to_csv("userupload.csv", index=False)
newusers.to_csv("newusers.csv", index=False)
1条答案
按热度按时间wvmv3b1j1#
这正常吗?我的数据集是不是太大了,需要几个小时才能运行?
主要的问题是你有两个嵌套的for循环:
match_rows()
内部,您调用abc.apply(..., axis=1)
xyz
中的每一行调用match_rows()
这基本上与做:
for ... in abc.iterrows():
内部,您可以调用has_fuzzy_match()
has_fuzzy_match()
内部,您可以调用for ... xyz.iterrows()
翻译过来就是:
其他问题:
ThreadPoolExecutor
在这里什么也不做,只会增加开销并降低速度。pd.concat
和pd.DataFrame
来添加新行是“昂贵的”。潜在改进:
看起来你有一个“精确匹配”的标准,如果失败了,你可以尝试模糊匹配:
精确匹配可以用
pd.merge
一次完成,例如从那里你可以找到你想要模糊匹配的行:
然后你可以用
no_exact_match
和xyz
来做一个"fuzzy merge":可以使用rapidfuzz代替fuzzywuzzy。
与前面的示例类似-查找不匹配可以在一个步骤中完成:
看起来你也在为经理id做匹配:
与
Emp No
示例一样,这可以通过“左合并”一次完成: