python-3.x 并发期货threadpoolexecutor重复列表项

cnh2zyt3  于 2023-04-08  发布在  Python
关注(0)|答案(1)|浏览(86)

我是Python中多线程的新手,并试图管理一个调用数千个API的脚本。我已经阅读了许多答案和文章,并得到了这个:

import requests
import json
import time
import sys
import concurrent.futures
import threading

thread_local = threading.local()
pet_data = []

def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session

def scrape(url):
    session = get_session()
    with session.get(url) as response:
        info = response.text
        pet_data.append(json.loads(info))

def run_scrapes(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
        executor.map(scrape, sites)
        executor.shutdown(wait=True)

sites是一个要调用的URL列表(它是一个分页的API,所以它是一个简单的'api.endpoint&page ='+ str(i)URL列表)。
它工作得很好,但我遇到的问题是它重复调用,很多次(根据通过日志进行的调试,每个URL中有6个被调用,即使列表中只有1个)。
我从文章/答案中发来的代码中有什么问题吗?我承认我没有完全理解get_session函数,我想这可能是问题所在。

dz6r00yl

dz6r00yl1#

在如何使用线程的本地数据对象方面,你有一个根本性的缺陷。这个对象是线程本地的(因此得名),需要每个线程获取。像你这样从主线程获取它,然后在get_session()中重用它,将在线程之间共享相同的Session对象。你需要总是在线程中获取它:

def get_session():
    thread_local = threading.local()
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session

我不清楚Session对象是否是线程安全的(有冲突的信息)。这可能是你的问题的原因。
你在收集结果时也错误地使用了范例。你应该做的是从scrape()返回相应的数据。然后通过map()调用收集它:

pet_data = list(executor.map(scrape, sites))

但这一部分并不重要。
增编**
然而,你的主要问题是你没有使用正确的工具来完成这项工作。对于像HTTP请求这样的异步I/O,你应该使用Python的asyncio功能,例如使用httpxhttps://www.python-httpx.org/async/
这样所有的请求都可以并发运行,即使你从一个线程启动它们。由于GIL的原因,多线程在很多情况下对Python没有多大帮助。

相关问题