这是我的应用程序体系结构:
在我的代码里有一个 pedestrian.py
该文件使用while循环从rtsp链接读取帧,并在执行行人检测过程(此链接中提供)后,将帧缓存在redis中。
(请注意,在循环中,每次将输出帧替换为来自循环的上一个输出时。这意味着redis在任何时刻都只存在一个帧。)
然后在flask应用程序中,我从redis读取已处理的帧并将其发送给客户机。
这是我的行人检测代码:
from redis import Redis
from concurrent.futures import ThreadPoolExecutor
import cv2
import torch
from os import environ
r = Redis('111.222.333.444')
class RealTimeTracking(object):
"""
This class is built to get frame from rtsp link and continuously
save each frame in the output directory. then we use flask to give it
as service to client.
Args:
args: parse_args inputs
cfg: deepsort dict and yolo-model cfg from server_cfg file
"""
def __init__(self, cfg, args):
# Create a VideoCapture object
self.cfg = cfg
self.args = args
use_cuda = self.args.use_cuda and torch.cuda.is_available()
if not use_cuda:
raise UserWarning("Running in cpu mode!")
self.detector = build_detector(cfg, use_cuda=use_cuda)
self.deepsort = build_tracker(cfg, use_cuda=use_cuda)
self.class_names = self.detector.class_names
self.vdo = cv2.VideoCapture(self.args.input)
self.status, self.frame = None, None
self.total_frames = int(cv2.VideoCapture.get(self.vdo, cv2.CAP_PROP_FRAME_COUNT))
self.im_width = int(self.vdo.get(cv2.CAP_PROP_FRAME_WIDTH))
self.im_height = int(self.vdo.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.output_frame = None
self.thread = ThreadPoolExecutor(max_workers=1)
self.thread.submit(self.update)
print('streaming started ...')
def update(self):
while True:
if self.vdo.isOpened():
(self.status, self.frame) = self.vdo.read()
def run(self):
while True:
try:
if self.status:
frame = self.frame.copy()
# frame = cv2.resize(frame, (640, 480))
self.detection(frame=frame)
frame_to_bytes = cv2.imencode('.jpg', frame)[1].tobytes()
r.set('frame', frame_to_bytes)
except AttributeError:
pass
def detection(self, frame):
im = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# do detection
bbox_xywh, cls_conf, cls_ids = self.detector(im)
if bbox_xywh is not None:
# select person class
mask = cls_ids == 0
bbox_xywh = bbox_xywh[mask]
bbox_xywh[:, 3:] *= 1.2 # bbox dilation just in case bbox too small
cls_conf = cls_conf[mask]
# do tracking
outputs = self.deepsort.update(bbox_xywh, cls_conf, im)
# draw boxes for visualization
if len(outputs) > 0:
self.draw_boxes(img=frame, output=outputs)
@staticmethod
def draw_boxes(img, output, offset=(0, 0)):
for i, box in enumerate(output):
x1, y1, x2, y2, identity = [int(ii) for ii in box]
x1 += offset[0]
x2 += offset[0]
y1 += offset[1]
y2 += offset[1]
# box text and bar
color = compute_color_for_labels(identity)
label = '{}{:d}'.format("", identity)
t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0]
cv2.rectangle(img, (x1, y1), (x2, y2), color, 3)
cv2.rectangle(img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
cv2.putText(img, label, (x1, y1 + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 2, [255, 255, 255], 2)
return img
if __name__ == "__main__":
args = parse_args() # argument: --rtsp_link = 'rtsp://me@111.222.333.444/Channels/105'
cfg = get_config()
cfg.merge_from_dict(model)
cfg.merge_from_dict(deep_sort_dict)
vdo_trk = RealTimeTracking(cfg, args)
vdo_trk.run()
这是flask服务器的代码 app.py
:
from dotenv import load_dotenv
from time import sleep
from os import getenv
from os.path import join
import subprocess
from flask import Response, Flask
from config.config import DevelopmentConfig
from redis import Redis
r = Redis('111.222.333.444')
app = Flask(__name__)
def gen():
while True:
frame = r.get('frame')
if frame is not None:
yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
@app.route('/')
def video_feed():
"""Video streaming route. Put this in the src attribute of an img tag."""
return Response(gen(),
mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == '__main__':
load_dotenv()
app.config.from_object(DevelopmentConfig)
cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/105']
p = subprocess.Popen(cmd)
sleep(6)
app.run()
这段代码在我的系统中运行得很好。
如您所见,在运行flask服务器之前,我使用cmd命令在rtsp链接上运行行人检测。
但是我真正需要的是能够在不同的摄像机之间切换。我的意思是,当flask服务器运行时,我希望能够终止 pedestrian.py
随时处理请求并重新启动 pedestrian.py
用新的 --rtsp_link
参数(切换到另一个摄像机)。
像这样:
@app.route('/cam1'):
def cam1():
stop('pedestrian.py')
cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/101']
p = subprocess.Popen(cmd)
@app.route('/cam2'):
def cam2():
stop('pedestrian.py')
cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/110']
p = subprocess.Popen(cmd)
我的知识可能不够好。我可能需要使用post方法和身份验证。
你能告诉我如何在这段代码中实现这样的东西吗?
1条答案
按热度按时间3yhwsihp1#
我找到了一种自动启动/停止行人检测的方法。更多详情请参见我的回购协议:
从os.path import join从os import getenv,environ从dotenv import load\u dotenv import argparse从threading import thread