c++ 如何从套接字循环编码/解码数据?

ocebsuys  于 2023-04-01  发布在  其他
关注(0)|答案(1)|浏览(110)

我目前正在用C++开发一个语音传输软件,使用PortAudio和Opus处理声音相关部分,使用Boost Asio处理网络部分。
我在尝试在循环中设置编码和解码时遇到了困难。
更准确地说,我有一个函数来处理我的项目的“呼叫”部分,当两个人接受彼此的呼叫时,这个函数就会启动。
我得到了2个不同的函数,一个当客户端接受调用,第二个当客户端发起调用。
这两个函数都是以相同的方式构建的,我开始初始化我需要的每个库,然后监听套接字以获取音频数据,并将编码后的音频数据发送到套接字。
我的问题是:我尝试在while语句中使用简单的条件来实现这一点:if(clientConnected){}但代码似乎只执行一次循环,然后阻塞,我不知道为什么。
(我知道这是因为std::cout〈〈dataSize只打印一次)
这是两个调用函数之一(receiveCall)(如果你有任何问题问我):

void Window::receiveCall(const std::string& ip) {

    // Initialize PortAudio and Opus
    PortAudioWrapper portAudio;
    OpusWrapper opus;
    if (!portAudio.OpenDefaultStream(1, 1, paFloat32, 48000, 960))
    {
        std::cerr << "Failed to open PortAudio stream" << std::endl;
        return;
    }
    if (!opus.Init(48000, 2, OPUS_APPLICATION_VOIP))
    {
        std::cerr << "Failed to initialize Opus" << std::endl;
        return;
    }
    if (!portAudio.StartStream())
    {
        std::cerr << "Failed to start PortAudio stream" << std::endl;
        return;
    }
    boost::asio::io_service ioService;
    boost::asio::ip::tcp::socket socket(ioService);
    boost::asio::ip::tcp::acceptor acceptor(ioService);

    try
    {
        // Create an acceptor to listen for incoming connections
        boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(ip), 12345);
        acceptor.open(endpoint.protocol());
        acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
        acceptor.bind(endpoint);
        acceptor.listen();

        std::cout << "Waiting for incoming connection on " << ip << ":12345" << std::endl;

        // Wait for a client to connect

        acceptor.accept(socket);

        std::cout << "Connected to client at " << socket.remote_endpoint().address().to_string() << std::endl;
        bool clientConnected = true;

        // Create a buffer to hold the audio data
        std::vector<unsigned char> buffer(1276);

        // Send and receive audio data
        while (clientConnected)
        {
            // Read audio data from PortAudio
            float audioData[960 * 2 * sizeof(float)];
            portAudio.readStream(audioData, 960);
            
            // Compress the audio data using Opus
            int dataSize = opus.Encode(audioData, 960, buffer.data(), 1276);
            std::cout << "dataSize : "<< dataSize << std::endl;
            // Send the compressed audio data to the client
            boost::asio::write(socket, boost::asio::buffer(buffer.data(), dataSize));

            // Receive compressed audio data from the client
            boost::system::error_code error;
            dataSize = socket.read_some(boost::asio::buffer(buffer), error);
            if (error == boost::asio::error::eof)
            {
                std::cout << "Client disconnected" << std::endl;
                clientConnected = false; // set flag to false when client disconnects
            }
            else if (error)
            {
                std::cerr << "Error receiving audio data: " << error.message() << std::endl;
                clientConnected = false; // set flag to false when client disconnects
            }

            // Decompress the audio data using Opus
            float decodedData[960 * 2 * sizeof(float)];
            int numSamples = opus.Decode(buffer.data(), dataSize, decodedData, 960);

            // Write the decompressed audio data to PortAudio
            portAudio.writeStream(decodedData, numSamples);
        }
    }    
    catch (std::exception& e) {
        std::cerr << "Exception in receiveCall: " << e.what() << std::endl;
    }
    // Cleanup code
    socket.close();
    acceptor.close();
    portAudio.StopStream();
}

问题可能出在循环中,但我不知道可能是什么。
先谢谢你了。

hyrbngr7

hyrbngr71#

Opus端和缓冲区尺寸(带警告)使用2通道。然而,您只使用一个通道初始化PA流。这可能是一个错误。
此外,这里缺少帧。您的代码简单地假设在一次调用中“发送”的任何内容也会在对read_some的一次调用中接收。顾名思义,情况并非如此。它读作“some”。
既然你似乎知道1276是编码数据大小的上限,你可以通过硬编码所选的缓冲区长度,通道数和样本格式来避免帧。然后你可以使用asio::read来复合读取一个或多个read_some操作来读取“整个缓冲区”(或EOF):

dataSize = read(socket, asio::buffer(buffer), error);

当然,这需要发送忽略dataSize才能工作:

write(socket, asio::buffer(buffer /*, dataSize*/));

但是,Decode调用需要知道dataSize,因此您仍然需要传输它。

其他问题

我认为audioDatadecodedAudio缓冲区的尺寸是关闭的。幸运的是在“太大”的一面。我认为* sizeof(float)因素是错误的。无论如何,我都会使用C++数组:

struct sample { float chan0, chan1; };
std::array<sample, 960> audioData, decodedData;

现在您也可以避免在不同的地方硬编码960幻数了!

发送big-endian datasize

使用Boost Endian在网络上可移植地表示32位整数:

using NetSize = boost::endian::big_uint32_t;

现在发送可能看起来像:

NetSize dataSize = opus.Encode(audioData.data(), audioData.size(), buffer.data(), buffer.size());
std::cout << "dataSize : " << dataSize << std::endl;

// Send the compressed audio data to the client
write(socket,
      std::array{
          asio::buffer(&dataSize, sizeof(dataSize)),
          asio::buffer(buffer, dataSize),
      });

和接收类似,但在两个步骤:

read(socket, asio::buffer(&dataSize, sizeof(dataSize)), error);
if (error) {
    dataSize = 0;
} else {
    dataSize = read(socket, asio::buffer(buffer, dataSize), error);
}

整合

**一个

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::tcp;

using NetSize = boost::endian::big_uint32_t;

enum { paFloat32 };
struct PortAudioWrapper {
    unsigned channels_ = 0;
    float    val_      = 0;

    bool OpenDefaultStream(int, int channels, int format, unsigned, int) {
        channels_ = channels;
        assert(format == paFloat32);
        return true;
    }

    bool StartStream() { return true; }
    bool StopStream() { return true; }

    // float audioData[960 * 2 * sizeof(float)];
    bool writeStream(void const* buf, size_t n) {
        float const* fp = static_cast<float const*>(buf);
        assert(n);
        auto val = *fp;
        std::cerr << "writing " << channels_ << "x" << n << " of " << val //
                  << " (all equal: " << std::boolalpha
                  << std::all_of(fp, fp + channels_ * n, [val](float sample) { return sample == val; })
                  << " )" << std::endl;
        return true;
    }
    bool readStream(void* buf, size_t n) {
        float* fp = static_cast<float *>(buf);
        val_ += 1.111;
        std::cerr << "recording " << channels_ << "x" << n << " of audio " << val_ << std::endl;
        std::fill_n(fp, channels_ * n, val_);
        return true;
    }
};

enum { OPUS_APPLICATION_VOIP };
struct OpusWrapper {
    unsigned channels_ = 0;

    bool Init(unsigned, unsigned channels, int) {
        channels_ = channels;
        return true;
    };

    int Encode(void const* pcm, size_t numsamples, uint8_t* buf, size_t bufsize) {
        assert(numsamples);

        numsamples = 1; // we just encode 1 sample - it's awesome compression!
        size_t const n = numsamples * channels_ * sizeof(float);
        assert(n <= bufsize);
        std::copy_n(static_cast<uint8_t const*>(pcm), n, buf);
        return n;
    }

    int Decode(uint8_t const* buf, size_t bufsize, void* pcm, size_t sampleroom) {
        int const samplesize = channels_ * sizeof(float);
        assert(samplesize);
        size_t const n = bufsize / samplesize;
        assert(n <= sampleroom);

        assert(bufsize >= n);
        std::copy_n(buf, n, static_cast<uint8_t*>(pcm));
        return n;
    }
};

struct Window {
    void receiveCall(std::string const& ip) {
        // Initialize PortAudio and Opus
        PortAudioWrapper portAudio;
        OpusWrapper      opus;
        if (!portAudio.OpenDefaultStream(1, 2, paFloat32, 48000, 960)) {
            std::cerr << "Failed to open PortAudio stream" << std::endl;
            return;
        }
        if (!opus.Init(48000, 2, OPUS_APPLICATION_VOIP)) {
            std::cerr << "Failed to initialize Opus" << std::endl;
            return;
        }
        if (!portAudio.StartStream()) {
            std::cerr << "Failed to start PortAudio stream" << std::endl;
            return;
        }

        assert(portAudio.channels_ == opus.channels_);
        asio::io_service ioService;
        tcp::socket      socket(ioService);
        tcp::acceptor    acceptor(ioService);

        try {
            // Create an acceptor to listen for incoming connections
            tcp::endpoint endpoint{asio::ip::address::from_string(ip), 12345};
            acceptor.open(endpoint.protocol());
            acceptor.set_option(tcp::acceptor::reuse_address(true));
            acceptor.bind(endpoint);
            acceptor.listen();

            std::cout << "Waiting for incoming connection on " << ip << ":12345" << std::endl;

            // Wait for a client to connect

            acceptor.accept(socket);

            std::cout << "Connected to client at " << socket.remote_endpoint() << std::endl;
            bool clientConnected = true;

            // Create a buffer to hold the audio data
            std::array<unsigned char, 1276> buffer;

            // Send and receive audio data
            while (clientConnected) {
                // Read audio data from PortAudio
                struct sample { float chan0, chan1; };
                std::array<sample, 960> audioData, decodedData;
                static_assert(sizeof(audioData) == 2 * 960 * sizeof(float));
                static_assert(sizeof(decodedData) == 2 * 960 * sizeof(float));

                portAudio.readStream(audioData.data(), audioData.size());

                // Compress the audio data using Opus
                NetSize dataSize = opus.Encode(audioData.data(), audioData.size(), buffer.data(), buffer.size());
                std::cout << "dataSize : " << dataSize << std::endl;

                // Send the compressed audio data to the client
                write(socket,
                      std::array{
                          asio::buffer(&dataSize, sizeof(dataSize)),
                          asio::buffer(buffer, dataSize),
                      });

                // Receive compressed audio data from the client
                boost::system::error_code error;
                read(socket, asio::buffer(&dataSize, sizeof(dataSize)), error);
                if (error) {
                    dataSize = 0;
                } else {
                    dataSize = read(socket, asio::buffer(buffer, dataSize), error);
                }

                clientConnected = !error.failed(); // set flag to false when client disconnects
                if (error == asio::error::eof) {
                    std::cout << "Client disconnected" << std::endl;
                } else if (error) {
                    std::cerr << "Error receiving audio data: " << error.message() << std::endl;
                }

                // Decompress the audio data using Opus
                assert(dataSize <= buffer.size());
                int numSamples = opus.Decode(buffer.data(), dataSize, decodedData.data(), decodedData.size());

                // Write the decompressed audio data to PortAudio
                portAudio.writeStream(decodedData.data(), numSamples);
            }
        } catch (std::exception& e) {
            std::cerr << "Exception in receiveCall: " << e.what() << std::endl;
        }

        // Cleanup code
        socket.close();
        acceptor.close();
        portAudio.StopStream();
    }
};

int main() {
    Window w;
    w.receiveCall("127.0.0.1");
}

Coliru输出:

g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp 
./a.out&
sleep 1; nc 127.0.0.1 12345 -w 1 | od -A none -f
Waiting for incoming connection on 127.0.0.1:12345
Connected to client at 127.0.0.1:34176
recording 2x960 of audio 1.111
dataSize : 8
Client disconnected
a.out: main.cpp:26: bool PortAudioWrapper::writeStream(const void*, size_t): Assertion `n' failed.
     3.85186e-34           1.111           1.111
bash: line 9:  7295 Aborted                 (core dumped) ./a.out

当然,“对等体”不会发送回包含opus数据的有效帧。

相关问题