DeepSpeed-MII 在多进程中,当调用客户端推理时阻塞,

vnjpjtjt  于 2个月前  发布在  其他
关注(0)|答案(3)|浏览(55)

我尝试将MII集成到Triton服务器,但遇到了一些问题。以下是我的部分代码:

错误是:当我使用

MII块在

当我使用

能够正常推断,但是gRPC持续报告错误(不影响推断,但服务不稳定)

56lgkhnf

56lgkhnf1#

我遇到了类似的情况。以下是我的代码:

def worker(rank, this_model):
    try:
        if this_model is None:
            client = mii.client('qwen')
        else:
            client = this_model
        response = client.generate(["xxx"], max_new_tokens=1024, stop="<|im_end|>", do_sample=False, return_full_text=True)
        print("in worker rank:", rank, " response:", response)
    except Exception as e:
        print(f"Capture error:{e}")
    finally:
        print("final")

model = mii.serve(model_dir, deployment_name="qwen", tensor_parallel=xx, replica_num=replica_num)

job_process = []
for rank in range(0, replica_num):
    if rank == 0:
        job_process.append(threading.Thread(target=worker,args=(rank,model,)))
    else:
        job_process.append(threading.Thread(target=worker,args=(rank,None,)))
for process in job_process:
    process.start()
for process in job_process:
    process.join()

当使用 threading.Thread 时,它运行良好。然而,如果使用 multiprocessing.Process ,它将在 client.generate 中被阻塞。

kqqjbcuj

kqqjbcuj2#

我遇到了类似的情况。以下是我的代码:

def worker(rank, this_model):
    try:
        if this_model is None:
            client = mii.client('qwen')
        else:
            client = this_model
        response = client.generate(["xxx"], max_new_tokens=1024, stop="<|im_end|>", do_sample=False, return_full_text=True)
        print("in worker rank:", rank, " response:", response)
    except Exception as e:
        print(f"Capture error:{e}")
    finally:
        print("final")

model = mii.serve(model_dir, deployment_name="qwen", tensor_parallel=xx, replica_num=replica_num)

job_process = []
for rank in range(0, replica_num):
    if rank == 0:
        job_process.append(threading.Thread(target=worker,args=(rank,model,)))
    else:
        job_process.append(threading.Thread(target=worker,args=(rank,None,)))
for process in job_process:
    process.start()
for process in job_process:
    process.join()

当使用 threading.Thread 时,它运行良好。然而,如果使用 multiprocessing.Process ,它将在 client.generate 中被阻塞。
由于 Python 中的 threading.Thread 是伪的,由于 GIL ,这段代码无法充分利用并发。这意味着我仍然需要 multiprocessing.Process 来启动新的客户端。然而,上述提到的它并不工作得很好。

rfbsl7qr

rfbsl7qr3#

我遇到了类似的情况。以下是我的代码:

def worker(rank, this_model):
    try:
        if this_model is None:
            client = mii.client('qwen')
        else:
            client = this_model
        response = client.generate(["xxx"], max_new_tokens=1024, stop="<|im_end|>", do_sample=False, return_full_text=True)
        print("in worker rank:", rank, " response:", response)
    except Exception as e:
        print(f"Capture error:{e}")
    finally:
        print("final")

model = mii.serve(model_dir, deployment_name="qwen", tensor_parallel=xx, replica_num=replica_num)

job_process = []
for rank in range(0, replica_num):
    if rank == 0:
        job_process.append(threading.Thread(target=worker,args=(rank,model,)))
    else:
        job_process.append(threading.Thread(target=worker,args=(rank,None,)))
for process in job_process:
    process.start()
for process in job_process:
    process.join()

当使用 threading.Thread 时,它运行良好。然而,如果使用 multiprocessing.Process ,它将在 client.generate 中被阻塞。
由于在 Python 中 threading.Thread 是伪的,由于 GIL ,这段代码无法充分利用并发。这意味着我仍然需要 multiprocessing.Process 来启动新的客户端。然而,上述提到的它并不工作得很好。
我找到了 official example 。也许我们应该像 these ways 那样启动服务器和客户端。

相关问题