opencv 如何将视频流从一个Python脚本传递到另一个Python脚本?

7gs2gvoe  于 2022-11-24  发布在  Python
关注(0)|答案(4)|浏览(199)

在我之前的文章中,我们找到了一种将图像文件从一个Python脚本传递到另一个脚本的方法:将视频数据从一个python脚本传递到另一个脚本
我现在试图通过一个视频(连续的图像):
write.py

import sys
import numpy as np
import cv2
from PIL import Image
import io
import time

while True:
    img = cv2.imread('cat.jpg')
    bimg = cv2.imencode('.jpg',img)[1]
    sys.stdout.buffer.write(bimg)
    sys.stdout.flush()
    time.sleep(1)

read.py:

import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
    
while True:
    data = sys.stdin.buffer.read()
    img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8), cv2.IMREAD_UNCHANGED)
    cv2.imshow('image', img_np)
    cv2.waitKey(0)

如果我将write.py数据输出到终端,它会打印。如果我手动将数据传递到read.py,它会被读取。但将它们放在一起(python3 write.py | python3 read.py),它就会挂起。write.py只写入一次,而read.py似乎永远不会得到它。
我猜读代码是在等待写代码“结束”,然后才打包数据包并将其称为图像。如果是这样的话,我会认为执行一次刷新就可以修复它。

soat7uwm

soat7uwm1#

我想我明白了。在read.py中,sys.stdin.buffer.read()读取并等待直到stdin管道关闭,但是write.py实际上从未关闭它的stdout,因为while True循环。这个概念证明简化的例子是有效的:
write.py

import sys
import time

sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()

# Note if we comment out the code bellow it works again
while True:
    # Keep this alive but don't have `while True:pass`
    # because my computer might crash :D
    time.sleep(10)

read.py之间的关系

import sys

with open("output.txt", "w") as file:
    file.write(sys.stdin.read())

如果我们从write.py中删除while True循环,代码将不再挂起,并且"Hello World"将被写入"output.py",因为当write.py完成写入时,它将关闭其进程,从而关闭管道。要解决此问题,我建议将read.py更改为如下内容:

import sys

while True:
    with open("output.txt", "a") as file:
        file.write(sys.stdin.read(1))

解决方法:
x1米15英寸

import sys
import time

MAX_FILE_SIZE = 16 # bytes

msg = b"Hello world"

# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())

sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()

# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()

# Note if we comment out the code bellow it works again
while True:
    # Keep this alive but don't have `while True:pass`
    # because my computer might crash :D
    time.sleep(10)

read.py之间的关系

import sys
import time

MAX_FILE_SIZE = 16 # bytes

while True:
    time.sleep(1) # Make sure `write.py` has sent the data
    # Read `MAX_FILE_SIZE` number of bytes and convert it to an int
    # So that we know the size of the file comming in
    length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
    time.sleep(1) # Make sure `write.py` has sent the data

    # Here you can switch to a different file every time `writer.py`
    # Sends a new file
    with open("output.txt", "wb") as file:
        file.write(sys.stdin.buffer.read(length))

    file_ended = sys.stdin.buffer.read(1)
    if file_ended == b"1":
        # File has ended
        break
    else:
        # We are going to start reading again for the next file:
        pass
    • 编辑:**解决方案的工作方式如下:

1.发送文件的大小
1.发送实际文件数据
1.发送一个字节,告诉read.py是否应该等待另一个文件
对于第1部分,我们只是将文件的长度编码为字符串,并在前面用0填充。注意:请确保MAX_FILE_SIZE大于最大文件的大小(大的数字会稍微降低性能)。对于第3部分,如果我们发送一个"1",这意味着没有更多的文件要发送。否则reader.py将等待并接受下一个文件。因此write.py将变为:

from math import log
import time
import sys
import cv2

MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)

def write_file(buffer, data, last_file=False):
   # Tell `reader.py` that it needs to read x number of bytes.
   length = len(data)
   # We also need to tell `read.py` how many bytes it needs to read.
   # This means that we have reached the same problem as before.
   # To fix that issue we are always going to send the number of bytes but
   # We are going to pad it with `0`s at the start.
   # https://stackoverflow.com/a/339013/11106801
   length = str(length).zfill(MAX_FILE_SIZE)
   with open("output.txt", "w") as file:
      file.write(length)
   buffer.write(length.encode())

   # Write the actual data
   buffer.write(data)

   # We also need to tell `read.py` that it was the last file that we send
   # Sending `1` means that the file has ended
   buffer.write(str(int(last_file)).encode())
   buffer.flush()

while True:
    img = cv2.imread("img.jpg")
    bimg = cv2.imencode(".jpg", img)[1]
    # Call write_data
    write_file(sys.stdout.buffer, bimg, last_file=False)
    # time.sleep(1) # Don't need this

read.py将变为:

from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys

MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)

def read(buffer, number_of_bytes):
    output = b""
    while len(output) < number_of_bytes:
        output += buffer.read(number_of_bytes - len(output))
    assert len(output) == number_of_bytes, "An error occured."
    return output

def read_file(buffer):
    # Read `MAX_FILE_SIZE` number of bytes and convert it to an int
    # So that we know the size of the file comming in
    length = int(read(buffer, MAX_FILE_SIZE))

    # Here you can switch to a different file every time `writer.py`
    # Sends a new file
    data = read(buffer, length)

    # Read a byte so that we know if it is the last file
    file_ended = read(buffer, 1)

    return data, (file_ended == b"1")

while True:
    print("Reading file")
    data, last_file = read_file(sys.stdin.buffer)
    img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8),
                          cv2.IMREAD_UNCHANGED)
    cv2.imshow("image", img_np)
    cv2.waitKey(0)

    if last_file:
        break;
ymdaylpp

ymdaylpp2#

你提到你要发送的图片大小不一致,但我必须假设它是否来自同一台相机(对于给定的视频流)原始图像的大小不会改变,而只是压缩图像的大小。我可以想象,您可能有足够的内存来一次存储至少一个未压缩的帧,而所有的压缩和解压缩只会带来处理开销。
假设我会使用multiprocessing.shared_memory创建一个共享缓冲区,它可以在两个进程之间共享帧(如果你想得到真实的的幻想,你甚至可以创建一个两个帧的循环缓冲区,并防止屏幕撕裂,但在我的测试中这不是一个大问题)
假设cv2.VideoCapture().read()可以直接读取一个现有的数组,并且你可以创建一个numpy数组,它使用共享内存作为缓冲区,你可以将数据读取到共享内存中,而不需要额外的复制。使用这个方法,我可以每秒读取用H.264编码的1280x688分辨率的视频文件中的近700帧。

from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np

vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie

#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
    raise Exception("error reading from video")

#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape", create=True, size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3, buffer=frame_shape_shm.buf, dtype='i4')  #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape

#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)

input("writer is ready: press enter once reader is ready")

try: #use keyboardinterrupt to quit
    while True:
        cap.read(frame_buffer) #read data into frame buffer
        # sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
    pass

#cleanup: IMPORTANT, close this one first so the reader doesn't unlink() the 
#  shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()

读取器的过程看起来非常相似,但是我们没有创建视频设备和read帧,我们只是构造了共享数组和imshow。GUI没有转储数据那么快,所以我们没有得到700 fps,但是500 fps也不错...

from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np

#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3], buffer=frame_shape_shm.buf, dtype='i4')

#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")

#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype='u1')
try:
    while True:
        cv2.imshow('frame', frame_buffer)
        cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
    pass

#cleanup: IMPORTANT the writer process should close before this one, so nothing 
#  tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()

**编辑:**用户的其他问题表明,可能需要python 3.8之前的版本(甚至可以跨版本工作),因此下面是使用posix_ipc代替multiprocessing.shared_memory创建帧缓冲区的示例(以及如何清理它):

#creation
shm = posix_ipc.SharedMemory(name="frame_buf", 
                             flags=posix_ipc.O_CREX, #if this fails, cleanup didn't happen properly last time
                             size=frame.nbytes)
shm_map = mmap.mmap(shm.fd, shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape, buffer=buf, dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer

#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.
x33g5p2x

x33g5p2x3#

两种解决方案:零MQ|磁盘缓存

使用ZeroMQ将帧从一个python文件发送到另一个python文件非常容易。
零MQ
通过PyPI安装:pip install -U pyzmq。发送帧的方式有多种。这是使用PUBLISHER和SUBSCRIBER的示例
第一个

磁盘缓存

你也可以考虑使用diskcache。它允许python对象通过内存传递。它和Redis一样,但都是Python,不需要服务器。注意:pip install --upgrade diskcache。您可以调整以开始从摄像机发送实时帧|视频
第一次
我将按照这些方向而不是sys进行操作,因为您可以完全控制流数据。参见diskcache Documentation

yr9zkbsy

yr9zkbsy4#

那么使用ROS发布器和订阅器呢?这将是简单的实现和容易察觉。

相关问题