python TCP套接字,通过Internet传输数据时丢失字节

1bqhqjot  于 2022-12-02  发布在  Python
关注(0)|答案(1)|浏览(150)

我有一个简单的客户端-服务器设置,可以传输JPG字节。当在本地运行时,它工作得很好。但是当在互联网上传输JPG时,它会被损坏,当解码时会有严重的视觉伪像。
客户端是一个Rust时雄应用程序。它使用来自摄像头的jpeg流,并将jpeg字节推送到TCP套接字。

// Async application to run on the edge (Raspberry Pi) 
// Reads MJPEG HTTP stream from provided URL and sends it to the CWS server over TCP

#![warn(rust_2018_idioms)] 

use std::convert::TryFrom;

use futures::StreamExt;

use tokio::signal;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

use std::error::Error;

use std::path::PathBuf;
use structopt::StructOpt;

//DEBUG
use std::fs;

#[derive(StructOpt, Debug)]
#[structopt(name = "basic")]

struct Opt {
    #[structopt(long = "stream", required(true))]
    stream: String,

    #[structopt(long = "server", required(true))]
    server: String,
}

async fn acquire_tcp_connection(server: &str) -> Result<TcpStream, Box<dyn Error>> {
    // "127.0.0.1:6142"
    loop {
        match TcpStream::connect(server).await {
            Ok(stream) => {
                println!("Connected to server");
                return Ok(stream);
            }
            Err(e) => {
                println!("Failed to connect to server: {}", e);
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            }
        }
    }

}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let opt = Opt::from_args();
    
    let url = http::Uri::try_from(opt.stream).unwrap();

    loop {
        let mut tcp_conn = acquire_tcp_connection(&opt.server).await?;

        // hyper client
        let client = hyper::Client::new();
        // Do the request
        let res = client.get(url.clone()).await.unwrap();
        // Check the status
        if !res.status().is_success() {
            eprintln!("HTTP request failed with status {}", res.status());
            std::process::exit(1);
        }
        // https://docs.rs/mime/latest/mime/#what-is-mime
        // Basically HTTP response content 
        let content_type: mime::Mime = res
            .headers()
            .get(http::header::CONTENT_TYPE)
            .unwrap()
            .to_str()
            .unwrap()
            .parse()
            .unwrap();

        assert_eq!(content_type.type_(), "multipart");

        let boundary = content_type.get_param(mime::BOUNDARY).unwrap();
        let stream = res.into_body();
        // https://github.com/scottlamb/multipart-stream-rs
        let mut stream = multipart_stream::parse(stream, boundary.as_str());

    
        'outer: while let Some(p) = stream.next().await {
            let p = p.unwrap();
            // Split the jpeg bytes into chunks of 2048 bytes
            for slice in p.body.chunks(2048) {
                // DEBUG capture bytes to a file just for debugging
                fs::write("tcp_debug.txt", &slice).expect("Unable to write file"); 

                let tcp_result = tcp_conn.write(&slice).await;

                match tcp_result {
                    Ok(_) => {
                        println!("Sent {} bytes", slice.len());
                    }
                    Err(e) => {
                        println!("Failed to send data: {}", e);
                        break 'outer;
                    }
                }
            }
        }
    }

    Ok(())
}

服务器是一个Python应用程序,它接受TCP连接并对接收到的字节进行解码:

import asyncio
from threading import Thread

import cv2
import numpy as np

class SingleFrameReader:
    """
    Simple API for reading a single frame from a video source
    """
    def __init__(self, video_source, ..., tcp=False):
        self._video_source = video_source
        ...
        elif tcp:
            self._stop_tcp_server = False
            self._tcp_image = None
            self._tcp_addr = video_source
            self._tcp_thread = Thread(target=asyncio.run, args=(self._start_tcp_server(),)).start()
            self.read = lambda: self._tcp_image

    async def _start_tcp_server(self):
        uri, port = self._tcp_addr.split(':')
        port = int(port)
        print(f"Starting TCP server on {uri}:{port}")
        server = await asyncio.start_server(
            self.handle_client, uri, port
        )

        addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
        print(f'Serving on {addrs}')

        async with server:
            await server.serve_forever()

    async def handle_client(self, reader, writer):
        """
        Handle a client connection. Receive JPEGs from the client and have them ready to be ready
        """
        client_addr = writer.get_extra_info('peername')
        print(f"New connection from {client_addr}")
        jpg_bytes = b''
        while True:
            if self._stop_tcp_server:
                break
            data = await reader.read(2**16)

            #DEBUG
            print(f"Received {len(data)} bytes from {client_addr}")
            byes_file = open('tcp_bytes_server.txt', 'wb')

            if not data:
                print("Client disconnected")
                break

            #DEBUG the corruption issue
            byes_file.write(data)

            jpg_bytes += data
            
            start_idx = jpg_bytes.find(b'\xff\xd8')
            end_idx = jpg_bytes.find(b'\xff\xd9')

            if start_idx != -1 and end_idx != -1:
                nparr  = np.frombuffer(jpg_bytes[start_idx:end_idx+2], np.uint8)
                img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                if img_np is None:
                    continue
                self._tcp_image = img_np
                jpg_bytes = jpg_bytes[end_idx+2:]
                
    ...

我试过发送不同大小的块,当一次发送整个图像时,图像伪影会更严重。
我捕获了从客户端(rust)发送到文件的字节和从服务器(python)接收到的字节。从服务器发送的字节比从rust发送的字节小约1.6KB,而从rust发送的字节小约2KB。并且第一个接收到的字节彼此不匹配。
同样,当在本地运行时,它工作得很顺利,你可以真实的看到摄像头流。当客户端和服务器被互联网分开时,字节似乎会被损坏。

juzqafwq

juzqafwq1#

要将字节写入您正在调用AsyncWriteExt::write的TCP套接字:

let tcp_result = tcp_conn.write(&slice).await;

并引用文档:
此函数将尝试写入buf的整个内容,但整个写入可能不会成功...
如果返回值是Ok(n),那么必须保证n〈= buf.len()。
您将查看是否存在错误并记录字节数,但如果字节数小于缓冲区的长度,则不执行任何操作。
但是什么时候缓冲区不会被完全写入呢?通常情况下,当你的程序比网络快很多或者你的分片太大的时候就可能发生这种情况。看起来当你连接到本地主机的时候这不是问题,但是当你连接到远程系统的时候就有问题了。YMMV
如果你在用C语言编程,你需要做一个循环,前进一个指针等等。在Rust中你也可以做这些,但是调用AsyncWriteExt::write_all来为你做循环会更容易:

let tcp_result = tcp_conn.write_all(&slice).await;

注意,现在tcp_resultio::Result<()>而不是io::Result<usize>,因为现在不写入分片中的所有字节是错误的。
同样要注意的是,当从TCP连接阅读数据时,即使使用Python,对等体也会有这个潜在的问题。这一行代码可能会执行部分读取,无论服务器是否写入了整个缓冲区:

data = await reader.read(2**16)

但是这一行已经在连接数据并等待整个图像的循环中,所以它不会引起任何问题。

相关问题