csv 我怎样才能让这段代码在大型数据集上运行得更快呢?

c86crjj0  于 2023-05-04  发布在  其他
关注(0)|答案(1)|浏览(153)

我使用Python编写代码来执行我的团队的一项常规任务。简而言之,我们有两个CSV用于管理和更新系统中的用户帐户。csv包含最新的数据,而XYZ.csv取自系统本身的帐户数据。
代码的目的是比较两个CSV之间的数据。当它发现XYS.csv与ABC.csv有不同的数据时,它会生成更新的CSV供我们上传到系统中。
当我使用4-5个测试记录时,代码运行得很好。但是,我们通常处理超过20,000条记录。当我尝试使用这些数据集运行代码时,它会连续运行,而不会产生错误或结果。
我甚至让它运行了一个多小时,但仍然没有什么。没有错误没有结果
我尝试使用Modin来加快处理速度,但它似乎也在连续运行,而不会产生任何结果或错误。
我的问题是:

  • 这正常吗?我的数据集是不是太大了,需要几个小时才能运行?
  • 我的代码是否需要优化,或者需要一个可以更快工作的不同库?
  • 两个CSV之间的数据非常不对称。csv有更多的记录,因为它包含不打算更新的帐户(我们没有办法轻易排除我们不需要的帐户)。会不会是数据中的某些东西导致了阻塞?
import pandas as pd
import concurrent.futures
from fuzzywuzzy import fuzz

def fuzzy_match(s1, s2, threshold=85):
    return fuzz.ratio(s1, s2) >= threshold

def match_rows(index, xyz_row):
    match = abc[abc["Emp No"] == xyz_row["Employee"]]
    abc_row = None

    if not match.empty:
        abc_row = match.iloc[0]
    else:
        # Fuzzy matching using First Name, Last Name, and Email
        fuzzy_matched_rows = abc.apply(lambda x: fuzzy_match(str(xyz_row["First Name"]) + " " + str(xyz_row["Last Name"]), str(x["First Name"]) + " " + str(x["Last Name"])) and (pd.isna(xyz_row["Email"]) or fuzzy_match(str(xyz_row["Email"]), str(x["Email"]))), axis=1)
        abc_fuzzy_matched = abc[fuzzy_matched_rows]
        if not abc_fuzzy_matched.empty:
            abc_row = abc_fuzzy_matched.iloc[0]

    return xyz_row, abc_row

abc = pd.read_csv("abc.csv", low_memory=False)
xyz = pd.read_csv("xyz.csv", low_memory=False)

def is_data_different(xyz_row, abc_row):
    if xyz_row["First Name"] != abc_row["First Name"]:
        return True
    if xyz_row["Last Name"] != abc_row["Last Name"]:
        return True
    if xyz_row["Employee"] != abc_row["Emp No"]:
        return True
    if xyz_row["User's Organisation Name(s)"] != abc_row["Org L13"]:
        return True
    if xyz_row["User's Job Title(s)"] != abc_row["Poslevel2"]:
        return True
    if xyz_row["Assignment Status"] != abc_row["Assignment Status"]:
        return True
    if not pd.isna(xyz_row["Email"]) and xyz_row["Email"] != abc_row["Email"]:
        return True
    # Add more field comparisons as needed
    return False

usermatch_columns = ["idnumber", "timemodified", "username", "deleted", "firstname", "lastname", "email", "town", "auth", "customfield_hierarchypath", "customfield_orgframework", "customfield_occcode", "customfield_posno", "customfield_title", "customfield_middlename", "customfield_fte", "customfield_employee", "customfield_payscale", "customfield_assignmentstartdate", "customfield_areaofwork", "customfield_maidenname", "customfield_assignmentstatus", "customfield_ProfessionalNumber", "customfield_TrainingGrade", "customfield_PatientHandler", "customfield_WorkswithChildren", "customfield_Clinical", "customfield_Frontline"]
jobassign_columns = ["idnumber", "useridenumber", "timemodified", "deleted", "fullname", "orgidnumber", "posidnumber", 'manageridnumber']

usermatch = pd.DataFrame(columns=usermatch_columns)
jobassign = pd.DataFrame(columns=jobassign_columns)
manager_username = None

# Add userupload_columns and userupload DataFrame
userupload_columns = ["username", "firstname", "lastname", "email", "auth", "idnumber"]
userupload = pd.DataFrame(columns=userupload_columns)

manager_idnumber_dict = dict(zip(abc["Emp No"], abc["Manager Employee Number"]))

with concurrent.futures.ThreadPoolExecutor() as executor:
    matched_rows = list(executor.map(match_rows, *zip(*xyz.iterrows())))

for xyz_row, abc_row in matched_rows:
    if abc_row is not None:
        # Check if User ID Number is empty, and use the capitalized version of the username if it is
        if pd.isna(xyz_row["User ID Number"]) or xyz_row["User ID Number"] == "":
            xyz_row["User ID Number"] = xyz_row["Username"].title()

        manager_id = abc_row["Manager Employee Number"]
        manager_username = xyz.loc[xyz["Employee"] == manager_id, "Username"]
        manager_username = manager_username.iloc[0] if not manager_username.empty else None
     
    if is_data_different(xyz_row, abc_row):
        # Populate usermatch.csv
        usermatch_row = {
            "idnumber": xyz_row["User ID Number"],
            "timemodified": 0,
            "username": xyz_row["Username"],
            "deleted": 0,
            "firstname": abc_row["First Name"],
            "lastname": abc_row["Last Name"],
            "email": xyz_row["Email"],
            "town": abc_row["City"],
            "auth": "manual",
            "customfield_hierarchypath": f'{abc_row["Org L3"]}~{abc_row["Org Level 3"]}~{abc_row["Org L6"]}~{abc_row["Org L13"]}',
            "customfield_orgframework": "UHSussex",
            "customfield_occcode": abc_row["Occ Code"],
            "customfield_posno": abc_row["Position No"],
            "customfield_title": abc_row["Title"],
            "customfield_middlename": abc_row["Middle Name"],
            "customfield_fte": abc_row["FTE"],
            "customfield_employee": abc_row["Emp No"],
            "customfield_payscale": abc_row["Pay Scale"],
            "customfield_assignmentstartdate": abc_row["Assign Start Date"],
            "customfield_areaofwork": abc_row["Area Of Work"],
            "customfield_maidenname": abc_row["Maiden Name"],
            "customfield_assignmentstatus": abc_row["Assignment Status"],
            "customfield_ProfessionalNumber": abc_row["Professional Registration Num"],
            "customfield_TrainingGrade": "",
            "customfield_PatientHandler": 0,
            "customfield_WorkswithChildren": 0,
            "customfield_Clinical": 0,
            "customfield_Frontline": 0
        }
        usermatch = pd.concat([usermatch, pd.DataFrame([usermatch_row], columns=usermatch_columns)], ignore_index=True)

        # Populate jobassign.csv
        jobassign_row = {
            "idnumber": abc_row["Emp No"],
            "useridenumber": xyz_row["User ID Number"],
            "timemodified": 0,
            "deleted": 0,
            "fullname": abc_row["Position Title"],
            "orgidnumber": abc_row["Org L13"],
            "posidnumber": abc_row["Poslevel2"],
            "manageridnumber": manager_username
        }
        jobassign = pd.concat([jobassign, pd.DataFrame([jobassign_row], columns=jobassign_columns)], ignore_index=True)

        # Populate userupload.csv
        userupload_row = {
            "username": xyz_row["Username"],
            "firstname": abc_row["First Name"],
            "lastname": abc_row["Last Name"],
            "email": xyz_row["Email"],
            "auth": "manual",
            "idnumber": xyz_row["User ID Number"],
        }
        userupload = pd.concat([userupload, pd.DataFrame([userupload_row], columns=userupload_columns)], ignore_index=True)

def has_fuzzy_match(abc_row):
    for _, xyz_row in xyz.iterrows():
        if fuzzy_match(abc_row["First Name"] + " " + abc_row["Last Name"], xyz_row["First Name"] + " " + xyz_row["Last Name"]) and (pd.isna(abc_row["Email"]) or fuzzy_match(abc_row["Email"], xyz_row["Email"])):
            return True
    return False

# Step 1: Create a new DataFrame called newusers with the same columns as userupload.csv
newusers_columns = ["username", "firstname", "lastname", "email", "auth", "idnumber"]
newusers = pd.DataFrame(columns=newusers_columns)

# Step 2: Iterate through the rows of abc.csv and check if each row exists in xyz.csv
for _, abc_row in abc.iterrows():
    if abc_row["Emp No"] not in xyz["Employee"].values and not has_fuzzy_match(abc_row):
        # Step 3: Generate a username from the email address and populate the newusers.csv file
        username = abc_row["Email"].split('@')[0] if pd.notna(abc_row["Email"]) else ""

        newusers_row = {
            "username": username,
            "firstname": abc_row["First Name"],
            "lastname": abc_row["Last Name"],
            "email": abc_row["Email"],
            "auth": "manual",
            "idnumber": username.title(),
        }
        newusers = pd.concat([newusers, pd.DataFrame([newusers_row], columns=userupload_columns)], ignore_index=True)

        # Add new user data to usermatch and jobassign DataFrames
        manager_id = abc_row["Manager Employee Number"]
        manager_username = xyz.loc[xyz["Employee"] == manager_id, "Username"]
        manager_username = manager_username.iloc[0] if not manager_username.empty else None

        usermatch_row = {
            "idnumber": username.title(),
            "timemodified": 0,
            "username": xyz_row["Username"],
            "deleted": 0,
            "firstname": abc_row["First Name"],
            "lastname": abc_row["Last Name"],
            "email": xyz_row["Email"],
            "town": abc_row["City"],
            "auth": "manual",
            "customfield_hierarchypath": f'{abc_row["Org L3"]}~{abc_row["Org Level 3"]}~{abc_row["Org L6"]}~{abc_row["Org L13"]}',
            "customfield_orgframework": "UHSussex",
            "customfield_occcode": abc_row["Occ Code"],
            "customfield_posno": abc_row["Position No"],
            "customfield_title": abc_row["Title"],
            "customfield_middlename": abc_row["Middle Name"],
            "customfield_fte": abc_row["FTE"],
            "customfield_employee": abc_row["Emp No"],
            "customfield_payscale": abc_row["Pay Scale"],
            "customfield_assignmentstartdate": abc_row["Assign Start Date"],
            "customfield_areaofwork": abc_row["Area Of Work"],
            "customfield_maidenname": abc_row["Maiden Name"],
            "customfield_assignmentstatus": abc_row["Assignment Status"],
            "customfield_ProfessionalNumber": abc_row["Professional Registration Num"],
            "customfield_TrainingGrade": "",
            "customfield_PatientHandler": 0,
            "customfield_WorkswithChildren": 0,
            "customfield_Clinical": 0,
            "customfield_Frontline": 0
        }
        usermatch = pd.concat([usermatch, pd.DataFrame([usermatch_row], columns=usermatch_columns)], ignore_index=True)

        jobassign_row = {
            "idnumber": abc_row["Emp No"],
            "useridenumber": username.title(),
            "timemodified": 0,
            "deleted": 0,
            "fullname": abc_row["Position Title"],
            "orgidnumber": abc_row["Org L13"],
            "posidnumber": abc_row["Poslevel2"],
            # other fields from the original code
            "manageridnumber": manager_username
        }
        jobassign = pd.concat([jobassign, pd.DataFrame([jobassign_row], columns=jobassign_columns)], ignore_index=True)

# Save the output CSVs
usermatch.to_csv("usermatch.csv", index=False)
jobassign.to_csv("jobassign.csv", index=False)
userupload.to_csv("userupload.csv", index=False)
newusers.to_csv("newusers.csv", index=False)
wvmv3b1j

wvmv3b1j1#

这正常吗?我的数据集是不是太大了,需要几个小时才能运行?
主要的问题是你有两个嵌套的for循环:

  • match_rows()内部,您调用abc.apply(..., axis=1)
  • 正在为xyz中的每一行调用match_rows()

这基本上与做:

for xyz_row in xyz.iterrows():
   for abc_row in abc.iterrows():
      ...
  • for ... in abc.iterrows():内部,您可以调用has_fuzzy_match()
  • has_fuzzy_match()内部,您可以调用for ... xyz.iterrows()

翻译过来就是:

for abc_row in abc.iterrows():
   for xyz_row in xyz.iterrows():
      ...

其他问题:

  • ThreadPoolExecutor在这里什么也不做,只会增加开销并降低速度。
  • 在循环内部使用pd.concatpd.DataFrame来添加新行是“昂贵的”。

潜在改进:

看起来你有一个“精确匹配”的标准,如果失败了,你可以尝试模糊匹配:

abc["Emp No"] == xyz_row["Employee"]

精确匹配可以用pd.merge一次完成,例如

exact_match = pd.merge(abc, xyz, how="left", left_on="Emp No", right_on="Employee")

从那里你可以找到你想要模糊匹配的行:

no_exact_match = abc[ ~abc["Emp No"].isin(exact_match["Emp No"]) ]

然后你可以用no_exact_matchxyz来做一个"fuzzy merge"

fuzzy_match = ...

可以使用rapidfuzz代替fuzzywuzzy。
与前面的示例类似-查找不匹配可以在一个步骤中完成:

no_fuzzy_match = abc[ ~abc["Emp No"].isin(fuzzy_match["Emp No"]) ]

看起来你也在为经理id做匹配:

for ... abc.iterrows():
   if not_a_match:
      manager_id = abc_row["Manager Employee Number"]
      xyz.loc[xyz["Employee"] == manager_id, ...

Emp No示例一样,这可以通过“左合并”一次完成:

new_users = pd.merge(no_fuzzy_match, xyz, how="left", left_on="Manager Employee Number", right_on="Employee")

相关问题