pytorch Torchserve自定义处理程序-如何传递Tensor列表以进行批量推理

w6lpcovy  于 2023-03-30  发布在  其他
关注(0)|答案(1)|浏览(160)

我试图在torchserve中创建一个自定义处理程序,并希望同时使用torchserve的批处理功能来实现资源的最佳利用。我无法找到如何为此推理编写自定义处理程序。
问题:我有一个20页的文档,我对每个文档都进行了OCRed。我使用“transfo-xl-wt 103”模型进行推理。
下面是我的代码:

import json
import logging
import os
import re
import sys

sys.path.append(os.getcwd())

# Deep learning
import torch, transformers
from transformers import AutoModel, AutoTokenizer
from ts.torch_handler.base_handler import BaseHandler
import concurrent.futures
from abc import ABC

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)8.8s] %(message)s",
    handlers=[logging.StreamHandler()],
)
logger = logging.getLogger(__name__)

class transformer_embedding_handler(BaseHandler, ABC):
    """Transformers handler class for text embedding

    Args:
        BaseHandler (Class): Base default handler to load torch based models
        ABC (Class): Helper class that provides a standard way to create ABC using inheritance
    """

    def __init__(self):
        """Class constructor"""
        logger.info("I am here, __init__ method")
        logger.info(f"transformer version:{transformers.__version__}")
        logger.info(f"torch version:{torch.__version__}")
        # run the constructor of the base classes
        super(transformer_embedding_handler, self).__init__()
        # will be set to true once initialize() function is completed
        self.initialized = False
        # configurations
        self.model_name = "transfo-xl-wt103"
        self.do_lower_case = True
        self.max_length = 1024

        # want batching?
        self.batching = False
        # if batching is set to true, padding should be true
        self.padding = False
        # Num of tensors in each batch
        self.batch_size = 8
        self.torch_worker = os.getenv('torch_worker') if os.getenv('torch_worker') else 5

    def initialize(self, ctx):
        """Get the properties for the serving context

        Args:
            ctx (object): context object that contains model server system properties
        """
        logger.info("I am here, initialize method")
        # properties = ctx.system_properties

        properties = ctx["system_properties"]
        logger.info(f"Properties: {properties}")
        model_dir = properties.get("model_dir")
        logger.info("Model dir: '%s'", model_dir)

        self.device = torch.device(
            "cuda:" + str(properties.get("gpu_id"))
            if torch.cuda.is_available()
            else "cpu"
        )

        # Load the model and the tokenizer
        self.model = AutoModel.from_pretrained(model_dir)
        self.tokenizer = AutoTokenizer.from_pretrained(model_dir, do_lower_case=self.do_lower_case)
        self.model.to(self.device)
        self.model.eval()
        logger.debug(
            "Transformer model from path {0} loaded successfully".format(model_dir)
        )

        # Initialization is done
        self.initialized = True

    def preprocess(self, requests):
        """Method for preprocessing the requests received before infernce

        Args:
            requests (list): list of requests to perform inference on

        Returns:
            dict: dict containing token tensors
        """
        logger.info("I am here, preprocess method")

        # decode the request
        full_text_list = requests[0].get("data")
        logger.info(f"Text from data {full_text_list}. type: {type(full_text_list)}")

        prepare_for_transformers_config = {"max_lens": self.max_length}
        with concurrent.futures.ThreadPoolExecutor(max_workers=int(self.torch_worker)) as executor:
            futures = [executor.submit(self.tokenizer.encode_plus, full_text, pad_to_max_length=False,
                                       add_special_tokens=True, return_tensors='pt') for full_text in full_text_list]
        token_ids = [future.result().to(self.device) for future in
                     futures]  # Not using as_completed function will maintain order of the list

        logger.info(f"Token Generation completed! Number of items in array: {len(token_ids)}")

        return token_ids

    def inference(self, token_ids):
        logger.info(f"I am here, inference method, processing token array length: {len(token_ids)}")
        logger.info("Padding: '%s'", self.padding)
        logger.info("Batching: '%s'", self.batching)
        inference = []

        for token in token_ids:
            inference.append(self.model(**token))

        return inference

    def postprocess(self, outputs):
        logger.info("I am here, postprocess method")
        out_file = "C:\\Users\\pbansal2\\Documents\\PycharmProjects\\embedder_test\\output\\embeddings.pt"
        with open(out_file, 'wb') as f:
            torch.save(outputs, f)

        return json.dumps({"success": True})

if __name__ == '__main__':
    tmp = transformer_embedding_handler()
    requests = [
        {
            "data": [
                "TEXTTTTTTTTTTT111111111111111111",
                "TEXTTTTTTTTTTT222222222222222222",
                "TEXTTTTTTTTTTT333333333333333333"
            ]
        }
    ]

    tmp.initialize({"system_properties": {
        "model_dir": "C:\\Users\\pbansal2\\Documents\\PycharmProjects\\embedder_test\\model-store", "gpu_id": 0,
        "batch_size": 1, "server_name": "MMS", "server_version": "0.4.2"}})
    out = tmp.preprocess(requests)
    inputs = tmp.inference(out)
    logger.info(inputs)
    tmp.postprocess(inputs)

我的问题是推理函数中的这一部分-

for token in token_ids:
            inference.append(self.model(**token))

有没有一种方法可以告诉torchserve在推理过程中使用batch_size和max_batch_delay,这样它就可以批量处理请求,而不是使用for循环并逐个计算?
我已经试过python的多处理器了,但没有多大帮助。同样不知道为什么,但是当我在一台8 CPU的机器上使用多处理器时,(并使用top命令进行分析),所有的CPU似乎都处于睡眠状态,几乎没有任何事情发生。
但是当我一个接一个地做的时候,大部分的CPU都显示出了使用情况。我不确定,但是看起来模型已经实现了某种并行性。
这是模型文档-https://huggingface.co/transfo-xl-wt103#how-to-get-started-with-the-model
任何帮助都很感激!谢谢。

idfiyjo8

idfiyjo81#

为了解决for循环,我可以使用TensorDataset和DataLoader类

batch_size=1
dataset_val = torch.utils.data.TensorDataset(token_ids)
dataloader_val = torch.utils.data.DataLoader(dataset_val, sampler=torch.utils.data.sampler.SequentialSampler(dataset_val), batch_size=batch_size)

后来我-

inference=[]
for batch in dataloader_val:
    inference.append(self.model(**batch))

在这个过程中我还学到了一件事,如果我使用上面的方法,我会告诉python/torch将令牌保留在内存中,而不是指示GC清理。这总是会导致OOM。所以我这样做了-

def do_inference(batch):
  return self.model(**batch)

def inference(self, token_ids):
        logger.info(f"I am here, inference method, processing token array length: {len(token_ids)}")
        logger.info("Padding: '%s'", self.padding)
        logger.info("Batching: '%s'", self.batching)
        inference = []

        for batch in dataloader_val:
            inference.append(self.do_inference(batch))

        return inference

随着函数的结束,python/torch GC确保在执行新的批处理之前,从内存中清除以前的Tensor。这对我帮助很大。

相关问题