Python多线程读取文件会导致性能降低:如何优化?

tjrkku2a  于 2023-06-04  发布在  Python
关注(0)|答案(1)|浏览(143)

我正在学习Python中的并发,我注意到threading模块甚至降低了我的代码速度。我的代码是一个简单的解析器,我从本地目录读取HTML,并将解析后的一些字段作为JSON文件输出到另一个目录。
我期待速度的提高,但是速度变低了,一次测试少量的HTML,50,200,1000,以及大量的HTML,如30 k。在所有情况下,速度都会降低。例如,使用1000个HTML,没有线程速度为~2.9秒,有线程速度为~4秒。
也尝试了concurrent.futuresThreadPoolExecutor,但它提供了同样慢的结果。
我知道GIL,但我认为I/O绑定的任务应该用多线程来处理。
下面是我的代码:

import json
import re
import time
from pathlib import Path
import threading

def get_json_data(body: str) -> re.Match[str] or None:
    return re.search(
        r'(?<=json_data">)(.*?)(?=</script>)', body
    )

def parse_html_file(file_path: Path) -> dict:
    with open(file_path, "r") as file:
        html_content = file.read()
        match = get_json_data(html_content)
        if not match:
            return {}

        next_data = match.group(1)
        json_data = json.loads(next_data)

        data1 = json_data.get("data1")
        data2 = json_data.get("data2")
        data3 = json_data.get("data3")
        data4 = json_data.get("data4")
        data5 = json_data.get("data5")

        parsed_fields = {
            "data1": data1,
            "data2": data2,
            "data3": data3,
            "data4": data4,
            "data5": data5
        }

        return parsed_fields

def save_parsed_fields(file_path: Path, parsed_fields: dict, output_dir: Path) -> None:
    output_filename = f"parsed_{file_path.stem}.json"
    output_path = output_dir / output_filename

    with open(output_path, "w") as output_file:
        json.dump(parsed_fields, output_file)

    print(f"Parsed {file_path.name} and saved the results to {output_path}")

def process_html_file(file_path: Path, parsed_dir: Path) -> None:
    parsed_fields = parse_html_file(file_path)
    save_parsed_fields(file_path, parsed_fields, parsed_dir)

def process_html_files(source_dir: Path, parsed_dir: Path) -> None:
    parsed_dir.mkdir(parents=True, exist_ok=True)

    threads = []
    for file_path in source_dir.glob("*.html"):
        thread = threading.Thread(target=process_html_file, args=(file_path, parsed_dir))
        thread.start()
        threads.append(thread)

    # Wait for all threads to finish
    for thread in threads:
        thread.join()

def main():
    base_path = "/home/my_pc/data"
    source_dir = Path(f"{base_path}/html_sample")
    parsed_dir = Path(f"{base_path}/parsed_sample")

    start_time = time.time()

    process_html_files(source_dir, parsed_dir)

    end_time = time.time()
    duration = end_time - start_time
    print(f"Application took {duration:.2f} seconds to complete.")

if __name__ == "__main__":
    main()

我知道asyncio,但我想正确地测试所有的多线程方法,以选择最适合我的方法。
如前所述也尝试了concurrent.futures,代码几乎是相同的,当处理html_files我有这些行:

with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Iterate over the HTML files in the source directory
        for file_path in source_dir.glob("*.html"):
            executor.submit(process_html_file, file_path, parsed_dir)

我的代码中有错误吗?我如何使用多线程更好地优化我的代码(除了asyncio)?

tf7tbtn2

tf7tbtn21#

首先,你可以通过在读取所有数据后立即关闭输入文件来改进parse_html_file(注意,其余代码不再位于with上下文管理器中):

def parse_html_file(file_path: Path) -> dict:
    with open(file_path, "r") as file:
        html_content = file.read()
        
    match = get_json_data(html_content)
    if not match:
        return {}

    next_data = match.group(1)
    json_data = json.loads(next_data)

    data1 = json_data.get("data1")
    data2 = json_data.get("data2")
    data3 = json_data.get("data3")
    data4 = json_data.get("data4")
    data5 = json_data.get("data5")

    parsed_fields = {
        "data1": data1,
        "data2": data2,
        "data3": data3,
        "data4": data4,
        "data5": data5
    }

    return parsed_fields

其次,不推荐使用threading的解决方案,因为它将创建与输入文件一样多的线程。这可能是很多,并可能导致速度减慢和高内存使用。在这种情况下,线程/进程池解决方案更好。确保max_workers设置为大于1的值(例如,等于计算机的CPU核心数)。
最后,如果并发执行的函数非常快,那么多线程和多处理不太可能加速程序的执行。在这种情况下,您只需支付管理并发的开销。你应该第一次精细的函数和确定那些非常慢的(是get_json_dataparse_html_file的其余指令,或者文件保存?).

相关问题