我试图在torchserve中创建一个自定义处理程序,并希望同时使用torchserve的批处理功能来实现资源的最佳利用。我无法找到如何为此推理编写自定义处理程序。
问题:我有一个20页的文档,我对每个文档都进行了OCRed。我使用“transfo-xl-wt 103”模型进行推理。
下面是我的代码:
import json
import logging
import os
import re
import sys
sys.path.append(os.getcwd())
# Deep learning
import torch, transformers
from transformers import AutoModel, AutoTokenizer
from ts.torch_handler.base_handler import BaseHandler
import concurrent.futures
from abc import ABC
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)8.8s] %(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
class transformer_embedding_handler(BaseHandler, ABC):
"""Transformers handler class for text embedding
Args:
BaseHandler (Class): Base default handler to load torch based models
ABC (Class): Helper class that provides a standard way to create ABC using inheritance
"""
def __init__(self):
"""Class constructor"""
logger.info("I am here, __init__ method")
logger.info(f"transformer version:{transformers.__version__}")
logger.info(f"torch version:{torch.__version__}")
# run the constructor of the base classes
super(transformer_embedding_handler, self).__init__()
# will be set to true once initialize() function is completed
self.initialized = False
# configurations
self.model_name = "transfo-xl-wt103"
self.do_lower_case = True
self.max_length = 1024
# want batching?
self.batching = False
# if batching is set to true, padding should be true
self.padding = False
# Num of tensors in each batch
self.batch_size = 8
self.torch_worker = os.getenv('torch_worker') if os.getenv('torch_worker') else 5
def initialize(self, ctx):
"""Get the properties for the serving context
Args:
ctx (object): context object that contains model server system properties
"""
logger.info("I am here, initialize method")
# properties = ctx.system_properties
properties = ctx["system_properties"]
logger.info(f"Properties: {properties}")
model_dir = properties.get("model_dir")
logger.info("Model dir: '%s'", model_dir)
self.device = torch.device(
"cuda:" + str(properties.get("gpu_id"))
if torch.cuda.is_available()
else "cpu"
)
# Load the model and the tokenizer
self.model = AutoModel.from_pretrained(model_dir)
self.tokenizer = AutoTokenizer.from_pretrained(model_dir, do_lower_case=self.do_lower_case)
self.model.to(self.device)
self.model.eval()
logger.debug(
"Transformer model from path {0} loaded successfully".format(model_dir)
)
# Initialization is done
self.initialized = True
def preprocess(self, requests):
"""Method for preprocessing the requests received before infernce
Args:
requests (list): list of requests to perform inference on
Returns:
dict: dict containing token tensors
"""
logger.info("I am here, preprocess method")
# decode the request
full_text_list = requests[0].get("data")
logger.info(f"Text from data {full_text_list}. type: {type(full_text_list)}")
prepare_for_transformers_config = {"max_lens": self.max_length}
with concurrent.futures.ThreadPoolExecutor(max_workers=int(self.torch_worker)) as executor:
futures = [executor.submit(self.tokenizer.encode_plus, full_text, pad_to_max_length=False,
add_special_tokens=True, return_tensors='pt') for full_text in full_text_list]
token_ids = [future.result().to(self.device) for future in
futures] # Not using as_completed function will maintain order of the list
logger.info(f"Token Generation completed! Number of items in array: {len(token_ids)}")
return token_ids
def inference(self, token_ids):
logger.info(f"I am here, inference method, processing token array length: {len(token_ids)}")
logger.info("Padding: '%s'", self.padding)
logger.info("Batching: '%s'", self.batching)
inference = []
for token in token_ids:
inference.append(self.model(**token))
return inference
def postprocess(self, outputs):
logger.info("I am here, postprocess method")
out_file = "C:\\Users\\pbansal2\\Documents\\PycharmProjects\\embedder_test\\output\\embeddings.pt"
with open(out_file, 'wb') as f:
torch.save(outputs, f)
return json.dumps({"success": True})
if __name__ == '__main__':
tmp = transformer_embedding_handler()
requests = [
{
"data": [
"TEXTTTTTTTTTTT111111111111111111",
"TEXTTTTTTTTTTT222222222222222222",
"TEXTTTTTTTTTTT333333333333333333"
]
}
]
tmp.initialize({"system_properties": {
"model_dir": "C:\\Users\\pbansal2\\Documents\\PycharmProjects\\embedder_test\\model-store", "gpu_id": 0,
"batch_size": 1, "server_name": "MMS", "server_version": "0.4.2"}})
out = tmp.preprocess(requests)
inputs = tmp.inference(out)
logger.info(inputs)
tmp.postprocess(inputs)
我的问题是推理函数中的这一部分-
for token in token_ids:
inference.append(self.model(**token))
有没有一种方法可以告诉torchserve在推理过程中使用batch_size和max_batch_delay,这样它就可以批量处理请求,而不是使用for循环并逐个计算?
我已经试过python的多处理器了,但没有多大帮助。同样不知道为什么,但是当我在一台8 CPU的机器上使用多处理器时,(并使用top命令进行分析),所有的CPU似乎都处于睡眠状态,几乎没有任何事情发生。
但是当我一个接一个地做的时候,大部分的CPU都显示出了使用情况。我不确定,但是看起来模型已经实现了某种并行性。
这是模型文档-https://huggingface.co/transfo-xl-wt103#how-to-get-started-with-the-model
任何帮助都很感激!谢谢。
1条答案
按热度按时间idfiyjo81#
为了解决for循环,我可以使用TensorDataset和DataLoader类
后来我-
在这个过程中我还学到了一件事,如果我使用上面的方法,我会告诉python/torch将令牌保留在内存中,而不是指示GC清理。这总是会导致OOM。所以我这样做了-
随着函数的结束,python/torch GC确保在执行新的批处理之前,从内存中清除以前的Tensor。这对我帮助很大。