python—如何完全控制与flask应用程序并行运行的进程(启动/终止)?

jfgube3f  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(580)

这是我的应用程序体系结构:

在我的代码里有一个 pedestrian.py 该文件使用while循环从rtsp链接读取帧,并在执行行人检测过程(此链接中提供)后,将帧缓存在redis中。
(请注意,在循环中,每次将输出帧替换为来自循环的上一个输出时。这意味着redis在任何时刻都只存在一个帧。)
然后在flask应用程序中,我从redis读取已处理的帧并将其发送给客户机。
这是我的行人检测代码:

from redis import Redis
from concurrent.futures import ThreadPoolExecutor
import cv2
import torch
from os import environ

r = Redis('111.222.333.444')

class RealTimeTracking(object):
    """
    This class is built to get frame from rtsp link and continuously
    save each frame in the output directory. then we use flask to give it
    as service to client.
    Args:
        args: parse_args inputs
        cfg: deepsort dict and yolo-model cfg from server_cfg file

    """

    def __init__(self, cfg, args):
        # Create a VideoCapture object
        self.cfg = cfg
        self.args = args
        use_cuda = self.args.use_cuda and torch.cuda.is_available()

        if not use_cuda:
            raise UserWarning("Running in cpu mode!")

        self.detector = build_detector(cfg, use_cuda=use_cuda)
        self.deepsort = build_tracker(cfg, use_cuda=use_cuda)
        self.class_names = self.detector.class_names

        self.vdo = cv2.VideoCapture(self.args.input)
        self.status, self.frame = None, None
        self.total_frames = int(cv2.VideoCapture.get(self.vdo, cv2.CAP_PROP_FRAME_COUNT))
        self.im_width = int(self.vdo.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.im_height = int(self.vdo.get(cv2.CAP_PROP_FRAME_HEIGHT))

        self.output_frame = None

        self.thread = ThreadPoolExecutor(max_workers=1)
        self.thread.submit(self.update)
        print('streaming started ...')

    def update(self):
        while True:
            if self.vdo.isOpened():
                (self.status, self.frame) = self.vdo.read()

    def run(self):
        while True:
            try:
                if self.status:
                    frame = self.frame.copy()
                    # frame = cv2.resize(frame, (640, 480))
                    self.detection(frame=frame)
                    frame_to_bytes = cv2.imencode('.jpg', frame)[1].tobytes()
                    r.set('frame', frame_to_bytes)
            except AttributeError:
                pass

    def detection(self, frame):
        im = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # do detection
        bbox_xywh, cls_conf, cls_ids = self.detector(im)
        if bbox_xywh is not None:
            # select person class
            mask = cls_ids == 0

            bbox_xywh = bbox_xywh[mask]
            bbox_xywh[:, 3:] *= 1.2  # bbox dilation just in case bbox too small
            cls_conf = cls_conf[mask]

            # do tracking
            outputs = self.deepsort.update(bbox_xywh, cls_conf, im)

            # draw boxes for visualization
            if len(outputs) > 0:
                self.draw_boxes(img=frame, output=outputs)

    @staticmethod
    def draw_boxes(img, output, offset=(0, 0)):
        for i, box in enumerate(output):
            x1, y1, x2, y2, identity = [int(ii) for ii in box]
            x1 += offset[0]
            x2 += offset[0]
            y1 += offset[1]
            y2 += offset[1]

            # box text and bar
            color = compute_color_for_labels(identity)
            label = '{}{:d}'.format("", identity)
            t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0]
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 3)
            cv2.rectangle(img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
            cv2.putText(img, label, (x1, y1 + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 2, [255, 255, 255], 2)
        return img

if __name__ == "__main__":

    args = parse_args() # argument: --rtsp_link = 'rtsp://me@111.222.333.444/Channels/105'
    cfg = get_config()
    cfg.merge_from_dict(model)
    cfg.merge_from_dict(deep_sort_dict)
    vdo_trk = RealTimeTracking(cfg, args)
    vdo_trk.run()

这是flask服务器的代码 app.py :

from dotenv import load_dotenv
from time import sleep
from os import getenv
from os.path import join
import subprocess
from flask import Response, Flask

from config.config import DevelopmentConfig
from redis import Redis

r = Redis('111.222.333.444')
app = Flask(__name__)

def gen():
    while True:
        frame = r.get('frame')
        if frame is not None:
            yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'

@app.route('/')
def video_feed():
    """Video streaming route. Put this in the src attribute of an img tag."""
    return Response(gen(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

if __name__ == '__main__':
    load_dotenv()
    app.config.from_object(DevelopmentConfig)
    cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/105']
    p = subprocess.Popen(cmd)
    sleep(6)
    app.run()

这段代码在我的系统中运行得很好。
如您所见,在运行flask服务器之前,我使用cmd命令在rtsp链接上运行行人检测。
但是我真正需要的是能够在不同的摄像机之间切换。我的意思是,当flask服务器运行时,我希望能够终止 pedestrian.py 随时处理请求并重新启动 pedestrian.py 用新的 --rtsp_link 参数(切换到另一个摄像机)。
像这样:

@app.route('/cam1'):
    def cam1():
        stop('pedestrian.py')
        cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/101']
    p = subprocess.Popen(cmd)

@app.route('/cam2'):
    def cam2():
        stop('pedestrian.py')
        cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/110']
    p = subprocess.Popen(cmd)

我的知识可能不够好。我可能需要使用post方法和身份验证。
你能告诉我如何在这段代码中实现这样的东西吗?

3yhwsihp

3yhwsihp1#

我找到了一种自动启动/停止行人检测的方法。更多详情请参见我的回购协议:
从os.path import join从os import getenv,environ从dotenv import load\u dotenv import argparse从threading import thread

from redis import Redis
from flask import Response, Flask, jsonify, request, abort

from rtsp_threaded_tracker import RealTimeTracking
from server_cfg import model, deep_sort_dict
from config.config import DevelopmentConfig
from utils.parser import get_config

redis_cache = Redis('127.0.0.1')
app = Flask(__name__)
environ['in_progress'] = 'off'

def parse_args():
    """
    Parses the arguments
    Returns:
        argparse Namespace
    """
    assert 'project_root' in environ.keys()
    project_root = getenv('project_root')
    parser = argparse.ArgumentParser()

    parser.add_argument("--input",
                        type=str,
                        default=getenv('camera_stream'))

    parser.add_argument("--model",
                        type=str,
                        default=join(project_root,
                                     getenv('model_type')))

    parser.add_argument("--cpu",
                        dest="use_cuda",
                        action="store_false", default=True)
    args = parser.parse_args()

    return args

def gen():
    """
    Returns: video frames from redis cache
    """
    while True:
        frame = redis_cache.get('frame')
        if frame is not None:
            yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'

def pedestrian_tracking(cfg, args):
    """
    starts the pedestrian detection on rtsp link
    Args:
        cfg:
        args:
    Returns:
    """
    tracker = RealTimeTracking(cfg, args)
    tracker.run()

def trigger_process(cfg, args):
    """
    triggers pedestrian_tracking process on rtsp link using a thread
    Args:
        cfg:
        args:
    Returns:
    """
    try:
        t = Thread(target=pedestrian_tracking, args=(cfg, args))
        t.start()
        return jsonify({"message": "Pedestrian detection started successfully"})
    except Exception:
        return jsonify({'message': "Unexpected exception occured in process"})

@app.errorhandler(400)
def bad_argument(error):
    return jsonify({'message': error.description['message']})

# Routes

@app.route('/stream', methods=['GET'])
def stream():
    """
    Provides video frames on http link
    Returns:
    """
    return Response(gen(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route("/run", methods=['GET'])
def process_manager():
    """
    request parameters:
    run (bool): 1  -> start the pedestrian tracking
                0  -> stop it
    camera_stream: str -> rtsp link to security camera
    :return:
    """
    # data = request.args
    data = request.args
    status = data['run']
    status = int(status) if status.isnumeric() else abort(400, {'message': f"bad argument for run {data['run']}"})
    if status == 1:
        # if pedestrian tracking is not running, start it off!
        try:
            if environ.get('in_progress', 'off') == 'off':
                global cfg, args
                vdo = data.get('camera_stream')
                if vdo is not None:
                    args.input = int(vdo)
                environ['in_progress'] = 'on'
                return trigger_process(cfg, args)
            elif environ.get('in_progress') == 'on':
                # if pedestrian tracking is running, don't start another one (we are short of gpu resources)
                return jsonify({"message": " Pedestrian detection is already in progress."})
        except Exception:
            environ['in_progress'] = 'off'
            return abort(503)
    elif status == 0:
        if environ.get('in_progress', 'off') == 'off':
            return jsonify({"message": "pedestrian detection is already terminated!"})
        else:
            environ['in_progress'] = 'off'
            return jsonify({"message": "Pedestrian detection terminated!"})

if __name__ == '__main__':
    load_dotenv()
    app.config.from_object(DevelopmentConfig)

    # BackProcess Initialization
    args = parse_args()
    cfg = get_config()
    cfg.merge_from_dict(model)
    cfg.merge_from_dict(deep_sort_dict)
    # Start the flask app
    app.run()

相关问题