如何使用异步WebSocket客户端延长nannou模型的生命周期?

4dc9hkyq  于 2023-06-23  发布在  其他
关注(0)|答案(1)|浏览(128)

我正在使用nannou反复画圆,并希望使用异步WebSocket客户端更新其模型。Nannou遵循MVC概念。有一个view方法来绘制东西,一个update方法来更新模型,然后有一个model方法来初始化模型。
当我初始化模型时,我启动了一个异步WebSocket客户端,它将模型作为引用。

tokio::spawn(listen_to_websocket(&mut model)); // this line causes the error

然而,当我试图编译程序时,我传递的模型引用触发了错误消息borrowed value does not live long enough
我不能复制模型,因为nannou只创建一次模型,然后在updateview方法中重用它。我也无法传递所有权,因为我不知道如何给予:有一个WebSocket客户端异步使用模型,然后是Nannou的主要部分异步绘制对象。我对生 rust 还是个新手。在这种情况下,如何延长模型的生命周期?当使用阻塞的WebSocket读取方法时,程序不会继续画圆圈,直到websocket上有新消息。只要nannou应用程序运行,模型就应该存在。我从nannou开始

nannou::app(model).update(update).run();

模型必须在model方法中创建。您也不能更改viewupdatemodel方法的方法签名。
完整代码:

use futures_util::{future, pin_mut, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

use nannou::prelude::*;

#[tokio::main]
async fn main() {
    nannou::app(model).update(update).run();
}

struct Model {
    rnd1: Vec2,
    ...
}
async fn listen_to_websocket(_model: &mut Model){
    // connect to websocket and listen to messages
}

fn model(_app: &App) -> Model {
    _app.new_window().view(view).build().unwrap();

    let mut model = Model {
        rnd1: ...,
        ...
    };
    tokio::spawn(listen_to_websocket(&mut model)); // this line causes the error

    return model;
}

fn update(_app: &App, _model: &mut Model, _update: Update) {
    // update model
    // executed every frame
}

fn view(_app: &App, _model: &Model, frame: Frame){
    // draw objects based on model
}
b5buobof

b5buobof1#

理想地,这将被固定在最里面的部分。你的websocket库可能有一个非阻塞的同步方式来读取websocket,你可以在绘制圆圈之间检入,而不产生任何东西,可能不使用任何异步。
另一种方法是使用channel生成websocket函数。这绝对可以以非阻塞同步方式使用。你可以在websocket任务中存储一个Sender,在模型中存储一个Receiver

use std::sync::mpsc;
struct Model {
    // you can create your own message type to use here instead of `String`
    receiver: mpsc::Receiver<String>, 
}

let (sender, receiver) = mpsc::channel();
let mut model = Model { receiver };
tokio::spawn(listen_to_websocket(sender));

然后在画圆之间,可以检查通道。

fn update(_app: &App, model: &mut Model, _update: Update) {
    draw_circles();
    if let Ok(message) = model.receiver.try_recv() {
        handle_message(message);
    }
}

您的websocket库可能有非常类似的东西,但是如果您的websocket消息需要大量处理,您可能仍然希望使用通道。此外,你可能想使用tokio的一个通道来代替,特别是如果你在循环中有其他异步的事情要做的话。
如果你还需要向websocket发送数据,你可以在相反的方向上创建另一个通道进行通信。
你可以做的另一件事是将你的模型存储在一个Arc中,已经有很多关于this one的帖子了。这些应用于std::thread::spawn的方式与tokio::spawn相同。
除了你的直接问题,你还需要避免阻塞你的异步运行时。当调用run时会发生这种情况,它将永远阻塞。Tokio有一些函数来生成阻塞任务。Nannou在run文档中这样说:
如果您希望保持跨平台友好性,我们建议您在主线程上调用此函数,因为某些平台要求其应用程序的事件循环和窗口在主线程上初始化。
这意味着你应该使用tokio的block_in_place

#[tokio::main]
async fn main() {
    tokio::task::block_in_place(|| nannou::app(model).update(update).run());
}

还要确保你没有阻止websocket任务。如果需要阻塞,可以使用tokio::task::spawn_blocking而不是tokio::spawn

更新

这里有一个非异步版本,可以用来代替只使用tungstenitetokio::spawn调用。不需要东京。然后,您可以将返回的Receiver合并到Model中,并在第二个代码块中像上面一样调用try_recv

use std::sync::mpsc;

// You can create your own message type to use here instead of `String`
type ChannelMessage = String;

// Call this when creating the Model
pub fn create_websocket_thread() -> mpsc::Receiver<ChannelMessage> {
    let (sender, receiver) = mpsc::channel();
    std::thread::spawn(move || {
        handle_connections(sender);
    });
    receiver
}

// Work to do in the websocket thread
fn handle_connections(sender: mpsc::Sender<ChannelMessage>) {
    let mut client = tungstenite::connect("ws://127.0.0.1:5000").unwrap().0;
    loop {
        let message = match client.read_message() {
            Ok(tungstenite::Message::Text(m)) => m,
            Ok(_) => (),
            Err(e) => panic!("{e}"),
        };
        let message = process_message(message);
        sender.send(message).unwrap();
    }
}

// Do whatever conversion you need
fn process_message(message: String) -> ChannelMessage {
    message
}

相关问题