python-3.x TDQM线程|多处理视觉错误

r1wp621o  于 2023-08-08  发布在  Python
关注(0)|答案(1)|浏览(134)

我偶然发现了一个需要手动更新tqdm条的案例。具体来说,我试图跟踪一个由Fortran库启动的进程。在下面的代码中,我复制了行为,保留了面向案例的细节。当一个过程完成时,问题就出现了。具体地,当过程完成时,光标位置改变,并且因此条相对于它们的初始位置移动,从而留下先前的迭代。此外,它发生,如果一个酒吧试图更新自己,当另一个酒吧正在改变,它的位置是重新计算从那里留下一个混乱。我相信这种行为与没有锁定有关,或者是锁定条形图中更改的方法。我没有在网上找到任何东西,所以我在这里发布。应该可以用队列来解决这个问题,但我不希望传递tqdm对象,因为它们是不可拾取的。

from concurrent.futures import ThreadPoolExecutor, as_completed
import random
from tqdm.auto import tqdm
from time import sleep 
from threading import Thread

def latest_time(dir:str):
    return random.randint(0,100), 'a', False
    
def monitor_run(
        dir:str,
        name: str,
        position: int,
        max_iter: int,
        refresh_progress: float=2,
) -> None:
    sleep(1 + (position+1)/10)
    with tqdm(
        total = max_iter,
        bar_format="{l_bar}{bar:30}{r_bar}",
        desc=f"\t\t{name}: 0.0 Progress",
        position= position,
        ncols = 100,
        leave = True,
        ascii= True,
        colour ='#00ff00',

    ) as pbar:
        desc_prev: float = 0
        while True:
            sleep(refresh_progress)
            time, desc, error = latest_time(dir)
            
            if desc is None:
                desc: float | None = desc_prev
            else:
                desc_prev = desc
                pbar.desc = f"\t\t {name}: {desc} Progress"
            
            if error:
                pbar.write(f"Encountered Error at {desc}")
                break
            
            if time is None:
                continue

            pbar.n = int(time)
            pbar.refresh(nolock=True)

            if time>=max_iter:
                pbar.close()
                break

def serial_monitor(
    dir:str,
    position: int,
    max_iter: int,
    refresh_progress: float = 2,
)-> None:
    monitor_run(dir,str(position), position ,max_iter, refresh_progress)

def serial_monitor_star(args)-> None:
    serial_monitor(*args)

def parallel_monitor(
        dirs: list[str],
        max_iter: int,
        refresh_progress: float =2,
) -> None:
    args_list = [
        [
            dir, position+1, max_iter, refresh_progress
        ] for position, dir in enumerate(dirs)
    ]
    # tqdm.write("\t\tStarting:")
    # thread_map(
    #     serial_monitor_star, args_list, tqdm_class = tqdm ,max_workers = len(reynolds)
    # )
    # tqdm.write("\t\tCompleted")
    with tqdm(total=2*len(dirs)):
        with ThreadPoolExecutor(max_workers= len(dirs)) as ex:
            futures = [
                ex.submit(
                    serial_monitor_star,
                    args
                ) for args in args_list
            ]
            for future in as_completed(futures):
                result = future.result()

if __name__ =="__main__":
    parallel_monitor(
        ['','','','','','',''],
        100,
        2
    )

字符串

33qvvth1

33qvvth11#

也许您可以稍微调整一下代码(并使用Queue来更新条形图)。
下面是一个示例,这可能是什么样子:工作线程向bar线程发送2个命令:new创建进度条,update创建进度条。每个栏都有一个唯一的ID,工作人员使用此ID更新栏。

import time
from tqdm import tqdm
from queue import Queue
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, as_completed

def some_worker_thread(work_num, bar_queue, val):
    bar_queue.put_nowait(("new", (work_num, val)))

    for i in range(val):
        time.sleep(0.05)
        bar_queue.put_nowait(("update", (work_num, 1)))

    return "some result"

def update_bar(q):
    bars = {}

    while True:
        command, args = q.get()
        match command:
            case "new":
                name, total_length = args
                bars[name] = tqdm(
                    desc=f'work-{name}',
                    total=total_length,
                    position=len(bars),
                    ascii=True,
                    colour="#00ff00",
                )
            case "update":
                name, how_much = args
                bars[name].update(how_much)
                bars[name].refresh()

if __name__ == "__main__":
    bar_queue = Queue()

    # start update progress bar thread
    # daemon= parameter is set to True so this thread won't block us upon exit
    bar_thread = Thread(target=update_bar, args=(bar_queue,), daemon=True)
    bar_thread.start()

    with ThreadPoolExecutor(max_workers=4) as ex:
        futures = [
            ex.submit(some_worker_thread, i, bar_queue, v)
            for i, v in enumerate([100, 120, 110, 130, 210, 100])
        ]
        for future in as_completed(futures):
            result = future.result()

    print('\n'*6)

字符串
这看起来像:


的数据

相关问题