Python的多处理队列使用大量的资源与opencv

hc2pp10m  于 2022-11-30  发布在  Python
关注(0)|答案(1)|浏览(128)

我正在使用多处理来获得一个视频帧使用Opencv在python。
我的类看起来像这样:-

import cv2
from multiprocessing import Process, Queue

class StreamVideos:
    def __init__(self):
        self.image_data = Queue()

    def start_proces(self):
        p = Process(target=self.echo)
        p.start()

    def echo(self):
        cap = cv2.VideoCapture('videoplayback.mp4')
        while cap.isOpened():
            ret,frame = cap.read()
            self.image_data.put(frame)
            # print("frame")

我使用以下命令启动进程“echo”:

p = Process(target=self.echo)
p.start()

echo函数如下所示:

def echo(self):
        cap = cv2.VideoCapture('videoplayback.mp4')
        while cap.isOpened():
            ret,frame = cap.read()
            self.image_data.put(frame)

我使用队列来放置这些帧

self.image_data.put(frame)

然后在另一个过程中我开始恢复这些画面

self.obj = StreamVideos()

    def start_process(self):
        self.obj.start_proces()
        p = Process(target=self.stream_videos)
        p.start()


    def stream_videos(self):
        while True:
            self.img = self.obj.image_data.get()
            print(self.img)

但是只要我开始把帧放进队列,RAM就会很快被填满,系统就会卡住。我正在使用的视频只有25 fps和39 MB大小,所以没有任何意义。
我注意到的一件事是,“echo”进程在“stream_videos”进程检索帧之前将大量帧放入队列中。
这个问题的根源可能是什么?
先谢谢你。
期望值:-
1.能够连续检索帧。
已尝试:-
1.不将帧放入队列中,在这种情况下,RAM不被填充。

jslywgbw

jslywgbw1#

下面是一个通用的单生成器/多使用者实现。(类StreamVideos)创建一个共享内存数组,其大小为视频帧中的字节数。(您将消费者的数量指定为StreamVideos)然后可以调用StreamVideos.get_next_frame()来检索下一帧。此方法将共享数组转换回numpy数组以供后续处理。只有在所有使用方都调用了get_next_frame之后,生成方才会将下一帧读入共享数组:

#!/usr/bin/env python3

import multiprocessing
import numpy as np
import ctypes
import cv2

class StreamVideos:
    def __init__(self, path, n_consumers):
        """
        path is the path to the video:
        n_consumers is the number of tasks to which we will be sreaming this.
        """
        self._path = path

        self._event = multiprocessing.Event()

        self._barrier = multiprocessing.Barrier(n_consumers + 1, self._reset_event)

        # Discover how large a framesize is by getting the first frame
        cap = cv2.VideoCapture(self._path)
        ret, frame = cap.read()
        if ret:
            self._shape = frame.shape
            frame_size = self._shape[0] * self._shape[1] * self._shape[2]
            self._arr = multiprocessing.RawArray(ctypes.c_ubyte, frame_size)
        else:
            self._arr = None
        cap.release()

    def _reset_event(self):
        self._event.clear()

    def start_streaming(self):
        cap = cv2.VideoCapture(self._path)

        while True:
            self._barrier.wait()
            ret, frame = cap.read()
            if not ret:
                # No more readable frames:
                break

            # Store frame into shared array:
            temp = np.frombuffer(self._arr, dtype=frame.dtype)
            temp[:] = frame.flatten(order='C')

            self._event.set()

        cap.release()
        self._arr = None
        self._event.set()

    def get_next_frame(self):
        # Tell producer that this consumer is through with the previous frame:
        self._barrier.wait()
        # Wait for next frame to be read by the producer:
        self._event.wait()
        if self._arr is None:
            return None

        # Return shared array as a numpy array:
        return np.ctypeslib.as_array(self._arr).reshape(self._shape)

def consumer(producer, id):
    frame_name = f'Frame - {id}'
    while True:
        frame = producer.get_next_frame()
        if frame is None:
            break
        cv2.imshow(frame_name, frame)
        cv2.waitKey(1)

    cv2.destroyAllWindows()

def main():
    producer = StreamVideos('videoplayback.mp4', 2)

    consumer1 = multiprocessing.Process(target=consumer, args=(producer, 1))
    consumer1.start()
    consumer2 = multiprocessing.Process(target=consumer, args=(producer, 2))
    consumer2.start()

    """
    # Run as a child process:
    producer_process = multiprocessing.Process(target=producer.start_streaming)
    producer_process.start()
    producer_process.join()
    """
    # Run in main process:
    producer.start_streaming()

    consumer1.join()
    consumer2.join()

if __name__ == '__main__':
    main()

相关问题