python 在单个视频流多处理上运行多个功能

uxhixvfz  于 2022-11-28  发布在  Python
关注(0)|答案(1)|浏览(168)

嘿,我正在尝试同时运行不同的人脸检测模型。我正在使用opencv库打开视频流,并为不同的人脸检测模型创建不同的进程对象。当我运行程序时,第一个方法运行成功,但第二个方法退出,并出现无法接收帧的错误。
主要的挑战是阅读捕获源(cap)的while循环,这使得它不同于之前在stackoverflow上发布的问题
代码如下:

import cv2
import dlib
from multiprocessing import Process

def haar_cascade():
    while True:
        ret,frame=cap.read()
        cv2.imshow('input',frame)
        if not ret:
            print("Can't receive frame (stream end?). Exiting ...")
            break
        classifier = cv2.CascadeClassifier('haarcascade_frontalface2.xml')
        faces = classifier.detectMultiScale(frame)
        for result in faces:
            x, y, w, h = result
            x1, y1 = x + w, y + h
            cv2.rectangle(frame, (x, y), (x1, y1), (0, 0, 255), 2)
        if cv2.waitKey(1) == ord('q'):
            break
        cv2.imshow('harr-cascade',frame)

def dlib_hog():
    while True:
        ret,frame=cap.read()
        if not ret:
            print("Can't receive frame (stream end?). Exiting ...")
            break
        detector = dlib.get_frontal_face_detector()
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = detector(gray, 1) # result
        #to draw faces on image
        for result in faces:
            x = result.left()
            y = result.top()
            x1 = result.right()
            y1 = result.bottom()
            cv2.rectangle(frame, (x, y), (x1, y1), (0, 0, 255), 2)
        if cv2.waitKey(1) == ord('q'):
            break
        cv2.imshow('dlib-hog',frame)

if __name__ == "__main__":
    cap =cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Cannot open camera")
        exit()
    harrProcess=Process(target=haar_cascade)
    harrProcess.start()
    dlibProcess=Process(target=dlib_hog)
    dlibProcess.start()

# When everything done, release the capture
harrProcess.join()
dlibProcess.join()
cap.release()
cv2.destroyAllWindows()

如何创建一个从单个源读取源并执行独立操作的多处理模型?

uqcuzwp8

uqcuzwp81#

我做了各种尝试:
1.我试着使用多处理,其中包含一个生产者进程和两个消费者进程。生产者创建的帧必须转换为共享内存数组,然后在消费者检索时转换回numpy数组。这些操作的开销很大,我发现帧正在丢失。
1.我试着使用多线程处理一个生产者线程和两个消费者线程。这在从生产者和消费者传递帧方面有较少的开销。当然,多线程处理的问题是由于全局解释器锁的争用,消费者所需的任何CPU密集型处理都不能与CPU并行运行-并且甚至可能导致生产者丢失帧。不幸的是,我不知道当使用摄像机输入时,是否有一种方法可以检测生产者部分丢失的帧。为了解决这些问题,我将一个多处理池传递给消费者线程,消费者线程可以向其提交对帧执行CPU密集型处理的任务。这里,在将帧从一个进程传递到另一个进程时也有足够的开销,并且帧会丢失。
1.与上面的第2点一样,我使用了多线程,但不是将CPU密集型工作提交给多处理池,我在消费者线程中执行它。这似乎使消费者丢失的帧更少。但我不知道它是否会导致生产者丢失它本来不会丢失的帧。因此,使用多处理池来执行CPU密集型工作似乎是更明智的方法。当然,如果你的CPU足够快,消费者和生产者都不应该错过帧。但是选择1(见第二个代码示例),即只使用多处理,可能是最好的。
在下面的演示中,由于我无法访问您的XML文件,因此我将其中一个消费者的处理过程虚拟化。只需按Enter键即可终止程序:

使用多线程

USE_POOL_FOR_COMPUTATION = False设置为通过直接调用执行CPU密集型处理,而不是将工作提交到多处理池:

#!/usr/bin/env python3

import threading
import multiprocessing
import cv2
import dlib

USE_POOL_FOR_COMPUTATION = True

class Producer:
    def __init__(self):

        # Create shared memory version of a numpy array:
        self._frame = None

        self._condition = threading.Condition()

        self._running = True

        # The latest frame number retrieved
        self._latest_frame_number = 0

    def run(self, cap):
        while self._running:
            ret, self._frame = cap.read()
            if not ret:
                self._running = False
            else:
                self._latest_frame_number += 1
            with self._condition:
                self._condition.notify_all()

    def stop(self):
        self._running = False

    def get_frame(self, sequence_number):
        with self._condition:
            # We block until we find a frame sequence number >= sequence_number.
            self._condition.wait_for(lambda: not self._running or self._latest_frame_number >= sequence_number)
        # Even after the stop method has been called and we are no longer running,
        # there could still be an unprocessed frame. But when we are called again, the current
        # frame number will be < the expected frame number:
        return (self._latest_frame_number, None if self._latest_frame_number < sequence_number else self._frame)

def process_haar_cascade(frame):
    classifier = cv2.CascadeClassifier('haarcascade_frontalface2.xml')
    faces = classifier.detectMultiScale(frame)
    for result in faces:
        x, y, w, h = result
        x1, y1 = x + w, y + h
        cv2.rectangle(frame, (x, y), (x1, y1), (0, 0, 255), 2)
    return frame

def haar_cascade(producer, pool):
    last_sequence_number = 0

    while True:
        expected = last_sequence_number + 1
        sequence_number, frame = producer.get_frame(expected)
        if frame is None:
            break

        cv2.waitKey(1) # allow window to update
        if sequence_number != expected:
            print(f'haar_cascade missed frames {expected} to {sequence_number-1}', flush=True)
        last_sequence_number = sequence_number

        cv2.imshow('input', frame) # Unmodified frame

        # Since I don't have required xml file, just skip processing:
        """
        if USE_POOL_FOR_COMPUTATION:
            frame = pool.apply(process_haar_cascade, args=(frame,))
        else:
            frame = process_haar_cascade(frame)
        """
        cv2.imshow('harr-cascade', frame)

def process_dlib_hog(frame):
    detector = dlib.get_frontal_face_detector()
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = detector(gray, 1) # result
    #to draw faces on image
    for result in faces:
        x = result.left()
        y = result.top()
        x1 = result.right()
        y1 = result.bottom()
        cv2.rectangle(frame, (x, y), (x1, y1), (0, 0, 255), 2)
    return frame

def dlib_hog(producer, pool):
    last_sequence_number = 0

    while True:
        expected = last_sequence_number + 1
        sequence_number, frame = producer.get_frame(expected)
        if frame is None:
            break

        cv2.waitKey(1) # allow window to update
        if sequence_number != expected:
            print(f'dlib_hog missed frames {expected} to {sequence_number-1}', flush=True)
        last_sequence_number = sequence_number

        if USE_POOL_FOR_COMPUTATION:
            frame = pool.apply(process_dlib_hog, args=(frame,))
        else:
            frame = process_dlib_hog(frame)
        cv2.imshow('dlib-hog', frame)

def main():
    producer = Producer()

    pool = multiprocessing.Pool(2) if USE_POOL_FOR_COMPUTATION else None
    # Pass pool for CPU-Intensive work:
    consumer1_thread = threading.Thread(target=haar_cascade, args=(producer, pool))
    consumer1_thread.start()
    consumer2_thread = threading.Thread(target=dlib_hog, args=(producer, pool))
    consumer2_thread.start()

    cap = cv2.VideoCapture(0)
    producer_thread = threading.Thread(target=producer.run, args=(cap,))
    producer_thread.start()

    input('Hit enter to terminate:\n')
    producer.stop()
    producer_thread.join()
    consumer1_thread.join()
    consumer2_thread.join()

    cap.release()
    cv2.destroyAllWindows()

    if USE_POOL_FOR_COMPUTATION:
        pool.close()
        pool.join()

if __name__ == '__main__':
    main()

使用多重行程

用于保存可共享帧的multiprocessing.RawArray必须在消费者进程运行之前进行分配,以便所有进程都可以访问该数组。这需要事先知道要创建的数组有多大:

#!/usr/bin/env python3

import multiprocessing
import ctypes
import cv2
import numpy as np
import dlib

FRAME_SHAPE = (480, 640, 3)
ARR_SIZE = FRAME_SHAPE[0] * FRAME_SHAPE[1] * FRAME_SHAPE[2]

class Producer:
    def __init__(self):

        # Create shared memory version of a numpy array:
        self._shared_array = multiprocessing.RawArray(ctypes.c_ubyte, ARR_SIZE)

        self._condition = multiprocessing.Condition()

        self._running = multiprocessing.RawValue('i', 1)

        # The latest frame number retrieved
        self._latest_frame_number = multiprocessing.RawValue('i', 0)

    def run(self):
        cap = cv2.VideoCapture(0)

        while self._running.value:
            ret, frame = cap.read()
            if not ret:
                self._running.value = 0
                with self._condition:
                    self._condition.notify_all()
                cap.release()
                cv2.destroyAllWindows()
                break

            self._latest_frame_number.value += 1

            # np array to shared_array
            temp = np.frombuffer(self._shared_array, dtype=frame.dtype)
            temp[:] = frame.flatten(order='C')

            with self._condition:
                self._condition.notify_all()

    def stop(self):
        self._running.value = 0

    def get_frame(self, sequence_number):
        with self._condition:
            # We block until we find a frame sequence number >= sequence_number.
            self._condition.wait_for(lambda: not self._running.value or self._latest_frame_number.value >= sequence_number)
        # Even after the stop method has been called and we are no longer running,
        # there could still be an unprocessed frame. But when we are called again, the current
        # frame number will be < the expected frame number:
        if self._latest_frame_number.value < sequence_number:
            return (self._latest_frame_number.value, None)
        # Convert to np array:
        temp = np.ctypeslib.as_array(self._shared_array)
        frame = temp.reshape(FRAME_SHAPE)
        return (self._latest_frame_number.value, frame)

def process_haar_cascade(frame):
    classifier = cv2.CascadeClassifier('haarcascade_frontalface2.xml')
    faces = classifier.detectMultiScale(frame)
    for result in faces:
        x, y, w, h = result
        x1, y1 = x + w, y + h
        cv2.rectangle(frame, (x, y), (x1, y1), (0, 0, 255), 2)
    return frame

def haar_cascade(producer):
    last_sequence_number = 0

    while True:
        expected = last_sequence_number + 1
        sequence_number, frame = producer.get_frame(expected)
        if frame is None:
            break

        cv2.waitKey(1) # allow window to update
        if sequence_number != expected:
            print(f'haar_cascade missed frames {expected} to {sequence_number-1}', flush=True)
        last_sequence_number = sequence_number

        cv2.imshow('input', frame) # Unmodified frame
        # Since I don't have required xml file, just skip processing:
        #frame = process_haar_cascade(frame)
        cv2.imshow('harr-cascade', frame)

def process_dlib_hog(frame):
    detector = dlib.get_frontal_face_detector()
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = detector(gray, 1) # result
    #to draw faces on image
    for result in faces:
        x = result.left()
        y = result.top()
        x1 = result.right()
        y1 = result.bottom()
        cv2.rectangle(frame, (x, y), (x1, y1), (0, 0, 255), 2)
    return frame

def dlib_hog(producer):
    last_sequence_number = 0

    while True:
        expected = last_sequence_number + 1
        sequence_number, frame = producer.get_frame(expected)
        if frame is None:
            break

        cv2.waitKey(1) # allow window to update
        if sequence_number != expected:
            print(f'dlib_hog missed frames {expected} to {sequence_number-1}', flush=True)
        last_sequence_number = sequence_number

        frame = process_dlib_hog(frame)
        cv2.imshow('dlib-hog', frame)

def main():
    producer = Producer()

    # Pass pool for CPU-Intensive work:
    consumer1_process = multiprocessing.Process(target=haar_cascade, args=(producer,))
    consumer1_process.start()
    consumer2_process = multiprocessing.Process(target=dlib_hog, args=(producer,))
    consumer2_process.start()

    producer_process = multiprocessing.Process(target=producer.run)
    producer_process.start()

    input('Hit enter to terminate:\n')
    producer.stop()
    producer_process.join()
    consumer1_process.join()
    consumer2_process.join()

if __name__ == '__main__':
    main()

相关问题