Python队列 线程池 进程池 基本概念以及使用方法

x33g5p2x  于2021-09-19 转载在 Python  
字(6.5k)|赞(0)|评价(0)|浏览(432)

线程、进程概念

算是对上一篇文章的回顾,具体细节参见上文。

概念

进程:一个正在运行的应用程序就是一个进程。一个进程是运行在其专用且受保护的内存空间中

线程:一个进程要执行任务必须要有线程。

进程 — 车间 线程 — 车间工人

线程的特点:一个线程执行多个任务是串行执行的

多线程:一个进程中有多个线程。多线程可以并行(同时)执行多个任务

多线程原理:多线程技术是通过利用CPU空闲时间干活来提高程序执行效率

多线程

一个应用程序默认对应一个进程,这个进程(主进程)中默认有一个线程(主线程)

使用方法
from threading import Thread

1)、直接使用Thread类

线程对象 = Thread(target = 函数,args = 参数对应的元组)

线程对象.start()

线程对象.join()

2)、使用Thread类的子类

class 线程子类(Thread):

​ def _ init_(self):

​ super()._ init_()

​ 实现线程对象任务需要的额外数据对应的属性

​ def run(self) -> None:

​ 线程任务

from multiprocessing import Process

3)、进程

进程对象 = Thread(target = 函数,args = 参数对应的元组)

进程对象.start()

进程对象.join()

队列

线程队列
from queue import Queue

Queue模块中的队列,只能保存一般数据或者多线程中产生的数据(多用于多线程,自带线程安全属性)
但是不能用来存储多进程产生的数据

if __name__ == '__main__':
    # 1、队列基本用法

    # 1)、创建队列对象
    queue = Queue()

    # 2)、添加数据(进) --- 队列对象.put(数据)
    queue.put(9)
    queue.put(99)

    # 3)、获取数据(出) --- 队列对象.get()
    print(f'队列queue中元素个数:{queue.qsize()}')
    print(queue.get())
    print(f'队列queue中元素个数:{queue.qsize()}')
    print(queue.get())
    print(f'队列queue中元素个数:{queue.qsize()}')
    # 4)、获取队列中元素个数:队列对象.qsize()
    # 5)、通过get获取数据的时候如果队列中没有数据,get方法会等待,直到队列有数据或超时
    print(queue.get(timeout=5))
队列在线程中的使用方法
from threading import Thread
from queue import Queue,Empty
import time
from random import randint

模拟电影下载函数:

def download(movie_name):
    print(f'{movie_name}开始下载')
    time.sleep(randint(1,5))
    print(f'{movie_name}下载完成')
    queue.put(f'{movie_name}数据')

处理数据函数:

def deal_data():
    while(True):
        data = queue.get()
        if(data == 'End'):
            break
        print(f'处理{data}')
if __name__ == '__main__':
    queue = Queue()

    # 创建子线程处理数据(方式三对应的代码)
    deal_thread = Thread(target=deal_data)
    deal_thread.start()

    threads = []
    names = [f'电影{x}' for x in range(1,11)]
    for name in names:
        thread = Thread(target=download,args=(name,))
        thread.start()
        threads.append(thread)

    # 获取队列数据方式1:能做到子线程得到数据主线程能立马处理数据,但数据处理完程序无法结束
    # while(True):
    # data = queue.get()
    # print(f'处理{data}数据')

    # 获取队列数据方式2:能做到子线程得到数据主线程能立马处理数据,通过超时来判断数据是否处理完成
    # while(True):
    # try:
    # data = queue.get(timeout=5)
    # print(f'处理{data}数据')
    # except Empty:
    # break

    # 3、在所有下载数据的线程都结束的时候在队列中添加结束标记,在子线程去获取队列来处理
    for t in threads:
        t.join()

    queue.put('End')

定义全局变量可以在任何一个线程中使用。

进程队列

全局变量无法解决数据的跨进程使用,但可以采用返回值
1)、基本操作

​ 创建队列对象:Queue()

​ 添加数据:队列对象.put(数据)

​ 获取数据:队列对象.get() / 队列对象.get(timeout = 超时时间)

2)、注意事项

如果想要使用一个队列对象获取不同进程

from multiprocessing import Process,Queue,current_process
import time
from random import randint
def download(movie_name,q:Queue):
    print(f'{movie_name}开始下载')
    time.sleep(randint(1,5))
    print(f'{movie_name}下载完成')
    q.put(f'{movie_name}')
    print(current_process())

def deal_data(q:Queue):
    while(True):
        data = q.get()
        if(data == 'End'):
            break
        print(f'处理{data}')
        print(current_process())
if __name__ == '__main__':
    queue = Queue()

    process = Process(target=download,args=('电影1',queue))
    process.start()
    process.join()

    print(queue.get())

练习:使用多个进程同时下载多个电影,将下载的电影数据保存到队列中,在一个新的进程中去处理下载到的数据

做到:边下载边处理,下载完就处理完,处理完程序马上结束

processes = []
names = [f'电影{x}' for x in range(1,11)]
for name in names:
    p = Process(target=download,args=(name,queue))
    p.start()
    processes.append(p)

p1 = Process(target=deal_data,args=(queue,))
p1.start()

for p in processes:
    p.join()

queue.put('End')

线程池

import time
from concurrent.futures import ThreadPoolExecutor
from random import randint
from threading import current_thread
def download(movie_name):
    print(f'{movie_name}开始下载')
    time.sleep(randint(1,5))
    print(f'{movie_name}下载完成')
    print(current_thread())

线程池的工作原理 : 提前创建指定个数的线程,保存到一个线程池中

​ 然后再往线程池中添加若干个任务,线程池自动为线程分配任务

  1. 创建线程池,确定线程池中线程数量
pool = ThreadPoolExecutor(max_workers = 10)
  1. 往线程池中添加任务
names = [f'电影{x}' for x in range(1,101)]

​ 1)、一次添加一个任务
​ 线程池.submit(函数,参数1,参数2…)

for name in names:
    pool.submit(download,name)

​ 2)、一次性添加多个任务
​ 线程.map(函数,包含所有任务的参数的序列)

pool.map(download,names)

​ 3)、关闭线程池

​ 线程池.shutdown() — 关闭线程池以后,线程池无法再添加任务,但是不影响已经添加的任务的执 行

pool.shutdown()
线程池的使用及功能
def download(movie_name,x):
    print(f'{movie_name}-{x}开始下载')
    time.sleep(randint(1, 5))
    print(f'{movie_name}-{x}下载完成')
    return movie_name
if __name__ == '__main__':

    # 1、创建线程池
    pool = ThreadPoolExecutor(max_workers = 30)

    # 2、添加任务
    # 线程池.submit(函数) --- 函数可以是有任意多个参数的函数;返回值是一个可操作的future对象
    # 线程池.map(函数) --- 函数只能为有且只有一个参数的函数;返回值没法控制和操作
    pool.submit(download, '天若有情', 9)
    all_movies = [pool.submit(download, f'电影{x}', x * 9) for x in range(1,100)]

    # 3、等待任务完成
    # wait(all_movies,return_when = ALL_COMPLETED)
    # print('所有电影下载完成!')

    # 4、获取任务函数的返回值
    for movie in as_completed(all_movies):
        print(f'--------------------{movie.result()}--------------------')

进程池

import time
from multiprocessing import Pool
from random import randint
def download(movie_name):
    print(f'{movie_name}开始下载')
    time.sleep(randint(1, 5))
    print(f'{movie_name}下载完成')
    return movie_name

1)、一次添加一个任务

a.进程池对象.apply(函数, 参数) — 同步/串行(一个一个执行);进程池中的多个任务串行执行

b.进程池对象.apply_async(函数, 参数) — 异步/并行;必须配合close()、join()一起使用

函数 - 任务对应的函数的函数名

参数 - 元组;调用任务函数的时候的实参,需要多少个实参,元祖中就有多少个元素

2)、一次添加多个任务

进程池对象.map(函数,参数序列) — 序列中有多少个元素就添加多少个任务;

​ 进程池中的任务并行,进程池中的任务和主进程串行

进程池.map_async(函数,参数序列) — 进程池中的任务和主程序并行执行

map 和 map_async的返回值是所有任务对应的函数的返回值

if __name__ == '__main__':
    pool = Pool(4)

    # -------------------------------------------------------------------
    # pool.apply(download, ('天若有情',))
    #
    # for movie in range(20):
    # pool.apply_async(download, (f'电影{movie}',))

    # result = pool.map_async(download,[f'电影{x}' for x in range(20)])
    result = pool.map_async(download,[f'电影{x}' for x in range(20)])
    #--------------------------------------------------------------------
    # 3、关闭进程池阻止向进程池内添加任务
    pool.close()

    # 4、等待进程池内的任务全都结束
    pool.join()
    print(result.get())   #map_async
    print(result)   #map
线程池的使用 — 51job爬取

网页数据获取函数

def get_one_page(page:int):
    headers = {
        'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4542.2 Safari/537.36'
    }

    url = f'https://search.51job.com/list/090200,000000,0000,00,9,99,%25E6%2595%25B0%25E6%258D%25AE%25E5%2588%2586%25E6%259E%2590,2,{page}.html?lang=c&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&ord_field=0&dibiaoid=0&line=&welfare='
    response = requests.get(url, headers = headers)

    # return response.text
    # 正则选取网页中接口数据
    json = findall(r'window.__SEARCH_RESULT__ = (.+?)</script>',response.text)[0]
    # return json

    # json数据转python字典数据
    messages = loads(json)
    # return messages
    job_message = []
    
    # 遍历字典中'engine_search_result'列表将岗位信息添加到job_message中
    for result in messages['engine_search_result']:
        job_message.append({
            '岗位名称':result['job_name'],
            '公司名称': result['company_name'],
            '公司详情网址': result['company_href'],
            '薪资待遇': result['providesalary_text'],
            '工作地点': result['workarea_text'],
            '公司类型': result['companytype_text'],
            '福利待遇': result['jobwelf'],
            '公司方向': result['companyind_text']
        })
    return job_message

创建线程池

pool = ThreadPoolExecutor(max_workers=20)
all_jobs = []

添加任务

for page in range(1,176):
    all_jobs.append(pool.submit(get_one_page,page))

在主线程中写入保存数据(此时主线程空闲)

file = open('files/jobs.csv','a',encoding='utf-8')
writer = csv.DictWriter(file,['岗位名称','公司名称','公司详情网址','薪资待遇','工作地点','公司类型','福利待遇','公司方向'])
writer.writeheader()
for job in as_completed(all_jobs):
    writer.writerows(job.result())

获取到的部分数据截图:

相关文章