DeepSpeed-MII 复制的readme结果

inb24sb2  于 6个月前  发布在  其他
关注(0)|答案(8)|浏览(74)

你好!你能提供这个实验的代码和启动命令吗?非常感谢任何参考资料!

zz2j4svz

zz2j4svz1#

期待您的回复。

js81xvg6

js81xvg63#

贡献者
非常感谢您的回复!我想再次询问关于使用管道的问题,即遍历自己的测试集并通过批处理输入到管道中。但我在后续迭代中发现了一个错误:Deadlock detected. Resetting KV cache and recomputing requests. Consider limiting number of concurrent requests or Consider limiting number of concurrent requests or decreasing max lengths of prompts/generations. 这是上一次迭代中使用的kvcache没有正确释放的原因吗?我应该如何正确使用非持久模式以充分利用此框架的性能?@mrwyattii

zsohkypk

zsohkypk4#

死锁可以在持久模式和非持久模式下发生。在将请求放在推理引擎之前,我们不会检查是否有足够的内存来计算每个请求生成的所有潜在令牌。这是出于性能原因,但如果请求过多,我们就会陷入这种死锁场景(阅读更多关于此的评论)。

我们正在努力改进,以便用户能够更好地调整推理引擎的行为,以避免给定用例中的死锁。

使用持久部署的一个好处是,您可以定期发送请求,而不是一次性全部发送。例如,如果您有50个请求...使用非持久管道并将它们全部批量发送 response = pipeline(my_50_requests) 可能会导致死锁。使用持久部署,您可以有许多客户端,每个客户端在时间上略有不同地发送子批次。这就是我们在基准测试中所做的第二种情况,因为它更接近于“现实世界”的服务场景。

持久部署还具有更快响应请求的优点。使用非持久管道,您必须等待所有50个请求完成后才能收到任何响应。但是有了多个客户端,您会很快得到第一个响应。

请让我知道是否需要进一步澄清!

2exbekwf

2exbekwf5#

死锁可以在持久模式和非持久模式下发生。在将请求放在推理引擎之前,我们没有检查是否有足够的内存来计算每个请求生成的所有潜在令牌。这是出于性能原因,但如果请求过多,我们就会遇到这种死锁情况(阅读更多关于此的评论)。
我们正在努力改进,以便用户能够更好地调整推理引擎的行为,以避免给定用例中的死锁。
使用持久部署的一个好处是,您可以定期发送请求,而不是一次性全部发送。例如,如果您有50个请求... 使用非持久管道并将它们一次性批量发送 response = pipeline(my_50_requests) 很可能会陷入死锁。使用持久部署,您可以有许多客户端,每个客户端在时间上略有不同地发送子批次。这就是我们在基准测试中所做的第二个场景,因为它更接近于“现实世界”的提供服务场景。
持久部署还具有更快响应请求的优点。使用非持久管道,您必须等待所有50个请求完成后才能收到任何响应。但是有了多个客户端,您会很快收到第一个响应。
请让我知道是否需要进一步澄清!
非常感谢您耐心而详细的回复!关于非持久模式的主要问题如下:

您可以看到,在前几个循环中,响应正常返回,但随后出现了死锁,因此我怀疑在前几个循环中使用的kvcache没有正常释放。这对我来说只是一个小问题,我正在尝试持久模式,从尝试基准代码来看应该没问题。然后我还想再次询问这个框架是否支持闪存解码,甚至闪存解码++,我非常期待将这个算法添加到这个框架中! @mrwyattii

gg0vcinb

gg0vcinb6#

请查看为什么会出现这个问题,我正在尝试在持久模式下测试自己的数据,但在预热阶段出现了错误:

[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,474] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,491] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,491] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,492] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,493] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,494] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,508] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,512] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,523] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,535] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
warmup queue size: 10000 (1393492)
---------------------------------init successfully-------------------
warmup queue size: 10000 (1393488)
warmup queue size: 10000 (1393484)
warmup queue size: 10000 (1393487)
warmup queue size: 10000 (1393485)
warmup queue size: 10000 (1393483)
warmup queue size: 9999 (1393481)
warmup queue size: 9999 (1393479)
warmup queue size: 9999 (1393491)
warmup queue size: 9999 (1393494)
warmup queue size: 9999 (1393480)
warmup queue size: 9998 (1393489)
warmup queue size: 9998 (1393486)
warmup queue size: 9998 (1393493)
warmup queue size: 9998 (1393490)
warmup queue size: 9998 (1393482)
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1222, in _serve
    if not _process_event_and_continue(state, event):
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1182, in _process_event_and_continue
    rpc_state, rpc_future = _handle_call(
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1058, in _handle_call
    _handle_with_method_handler(
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1005, in _handle_with_method_handler
    return _handle_unary_stream(
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 872, in _handle_unary_stream
    return thread_pool.submit(
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/concurrent/futures/thread.py", line 169, in submit
    raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up

这是 run_all.sh :

RAGGED_BATCH_SIZE=512
PARAM_SIZE=(70b)

TP=8

DEPLOYMENT_NAME=llama2-${PARAM_SIZE}-tp${TP}-b${RAGGED_BATCH_SIZE}
python server.py --model_name /data/hf_models/Llama-2-70b-chat-hf -d ${DEPLOYMENT_NAME} -m ${TP} -b ${RAGGED_BATCH_SIZE} start

DEPLOYMENT_NAME=${DEPLOYMENT_NAME} bash ./run_benchmark_client.sh

echo "Stopping server"
python server.py -d ${DEPLOYMENT_NAME} stop

这是 run_benchmark_client.sh

#!/bin/bash

DEPLOYMENT_NAME=${DEPLOYMENT_NAME:-llama2-70b}
VLLM=${VLLM:-""}

CLIENT_NUMS=${CLIENT_NUMS:-16}
REQUEST_NUM=${REQUEST_NUM:-10000}

LOG_DIR=logs.${DEPLOYMENT_NAME}
mkdir -p ${LOG_DIR}

RESULT_FILE=${DEPLOYMENT_NAME}_c${CLIENT_NUMS}.json
python run_benchmark_client.py -w 1 \
    -d ${DEPLOYMENT_NAME} -n ${REQUEST_NUM} -c ${CLIENT_NUMS} \
    -o ${LOG_DIR}/${RESULT_FILE} \
    ${VLLM} --stream \
    2>&1 | tee ${LOG_DIR}/bench_client_num_c${CLIENT_NUMS}.log

这是 run_benchmark_client.py 中更改的代码

def _run_parallel(deployment_name, warmup, barrier, query_queue, result_queue, client_num, stream, vllm):
    pid = os.getpid()
    session_id = f"test_session_p{pid}_t{threading.get_ident()}"

    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)
    if not vllm:
        import mii
        client = mii.client(deployment_name)
    time.sleep(random.uniform(0, client_num) * 0.01)
    barrier.wait()

    for _ in range(warmup):
        print(f"warmup queue size: {query_queue.qsize()} ({pid})", flush=True)
        input_tokens, req_max_new_tokens = query_queue.get(timeout=1.0)
        if vllm:
            call_vllm(input_tokens, req_max_new_tokens, stream)
        else:
            call_mii(client, input_tokens, req_max_new_tokens, stream)
            
    print("pass warm up",flush=True)

    barrier.wait()

    time.sleep(random.uniform(0, client_num) * 0.01)
    try:
        while not query_queue.empty():
            print(f"queue size: {query_queue.qsize()} ({pid})", flush=True)
            input_tokens, req_max_new_tokens = query_queue.get(timeout=1.0)

            # Set max_new_tokens following normal distribution
            if vllm:
                r = call_vllm(input_tokens, req_max_new_tokens)
            else:
                r = call_mii(client, input_tokens, req_max_new_tokens, stream)
            result_queue.put(r)
    except queue.Empty:
        print(f"queue is empty ({pid})",flush=True)

    print(f"Worker ({pid}) finished. session_id: {session_id}",flush=True)


def run_client(client_num, deployment_name, num_queries, warmup, stream, vllm, use_thread=False):
    """
    Run MII client for benchmarking. The scenario is a bit complicated:
    1. The main process puts `num_queries` queries into the input queue
    2. Each client runs `warmup` iterations () taking the queries from the input queue
    3. --- barrier ---
    4. The main process marks the start time
    5a. All clients send `num_queries' query in total and put the results into the result queue
    5b. The main process takes the results from the result queue (in parallel with 5a)
    6. The main process marks the end time after receiving `num_queries' results
    """

    if use_thread:
        runnable_cls = threading.Thread
        barrier_cls = threading.Barrier
        queue_cls = queue.Queue
    else:
        runnable_cls = multiprocessing.Process
        barrier_cls = multiprocessing.Barrier
        queue_cls = multiprocessing.Queue

    barrier = barrier_cls(client_num + 1)
    query_queue = queue_cls()
    result_queue = queue_cls()

    processes = [runnable_cls(target=_run_parallel,
                              args=(deployment_name, warmup, barrier, query_queue, result_queue, client_num, stream, vllm))
                 for i in range(client_num)]
    for p in processes:
        p.start()

    tokenizer = AutoTokenizer.from_pretrained("/data/hf_models/Llama-2-70b-chat-hf")
   

    with open("mydataset.json") as f:
        request_text = json.load(f)

    num_samples = args.num_queries
    if num_samples is not None:
        request_text = request_text[0:num_samples]

    for p,pl,ol in request_text:
        query_queue.put((p, ol))
    print("---------------------------------init process---------------------------",flush=True)
    # Tokenizers must be initialized after fork.
    # So we need to fork before putting inputs to the queue.
    # We need this barrier to stop child processse from taking inputs before the main process puts them
    
    barrier.wait()

    print("---------------------------------init successfully-------------------",flush=True)
    # This barrier is to make sure that all clients have finished warmup
    barrier.wait()
    print("---------------------------------finish warm up-------------------",flush=True)

    response_details = []
    print("------------------------get the result-----------------------------------",flush=True)
    while len(response_details) < args.num_queries:
        res = result_queue.get()
        # vLLM returns concatinated tokens
        if vllm:
            all_tokens = tokenizer.tokenize(res.generated_tokens)
            res.generated_tokens = all_tokens[len(tokenizer.tokenize(res.prompt)):]
        response_details.append(res)
        print("====================",flush=True)
        print(len(response_details),flush=True)
        print("====================",flush=True)

    return response_details

期待您的回复~

s8vozzvw

s8vozzvw7#

@mrwyattii 您的帮助非常需要,使用持久化并没有非常顺利,为什么当我在自己的数据集上运行时,总是会出现GPU不再被利用的情况,但程序从未执行,也没有报告错误。我应该如何正确使用非持久化模式?

yrwegjxp

yrwegjxp8#

你好,Traveller2001。抱歉回复晚了,过去两周我一直不在办公室。你能在没有你这里指定的更改的情况下运行基准测试吗?我会尝试重现结果,但Tohtana可能能提供更好的帮助,因为他们编写了基准测试代码。

相关问题