rust 原子地排干一个mpsc之前,另一个

83qze16e  于 2023-05-07  发布在  其他
关注(0)|答案(1)|浏览(129)

我在一个线程中有两个std::sync::mpsc接收器,一个优先接收器和一个常规接收器。有没有什么方法可以在使用常规队列之前自动确保优先级队列被清空?我确保Sender s上存在happens-before关系,因此优先级操作总是首先发送。
以下是我目前的解决方案,我几乎可以肯定这是不正确的:

loop {
    for priority_op in priority_rx.try_iter() {
        match priority_op {
            ...
        }
    }
    
    // A priority op, and then a regular op could arrive here

    let op = rx.recv().unwrap();
    match op {
        ...
    }
}

如果这是不可能的,我可以使用什么替代架构(除了只有一个队列)来实现这种行为?

6g8kf2rb

6g8kf2rb1#

在普通频道收到东西后,可以查看优先频道。

loop {
    for priority_op in priority_rx.try_iter() {
        use_priority(priority_op);
    }

    // A priority op, and then a regular op could arrive here

    let op = rx.recv().unwrap();
    let raced_priority = priority_rx.try_recv();

    if let Ok(raced) = raced_priority {
        // return op to queue
        tx.send(op);
        use_priority(raced);
    } else {
        // No race occurred
        use_op(op);
    }
}

这有一个很大的缺点,你失去了常规渠道的秩序。所以,你可以把常规的操作放到一个局部变量中。

let mut regular = None;
loop {
    for priority_op in priority_rx.try_iter() {
        use_priority(priority_op);
    }

    // This op was received in the last loop
    if let Some(event) = regular.take() {
        use_op(event);
    }

    // A priority op, and then a regular op could arrive here

    let op = rx.recv().unwrap();
    let raced_priority = priority_rx.try_recv();
    if let Ok(raced) = raced_priority {
        // return op to queue
        regular = Some(op);
        use_priority(raced);
    } else {
        // No race occurred
        use_op(op);
    }
}

如果你改变循环的位置,这会写得更好。

loop {
    let op = rx.recv().unwrap();
    let raced_priority = priority_rx.try_recv();

    let regular = if let Ok(raced) = raced_priority {
        use_priority(raced);
        // return op to queue
        Some(op)
    } else {
        // No race occurred
        use_op(op);
        None
    };

    for priority_op in priority_rx.try_iter() {
        use_priority(priority_op);
    }

    // This op was received before draining priority_rx
    if let Some(event) = regular {
        use_op(event);
    }
}

为了保持优先级队列的响应性,您可能希望删除recv,而使用recv_timeouttry_recv,并且您可能希望在优先级队列上执行此操作。

loop {
    let regular = match rx.try_recv() {
        Ok(op) => {
            let raced_priority = priority_rx.try_recv();

            if let Ok(raced) = raced_priority {
                // return op to queue
                use_priority(raced);
                Some(op)
            } else {
                // No race occurred
                use_op(op);
                None
            }
        }
        Err(TryRecvError::Empty) => None,
        Err(_) => panic!("Disconnected"),
    };

    if let Ok(priority_op) = priority_rx.recv_timeout(some_duration) {
        use_priority(priority_op);
        for priority_op in priority_rx.try_iter() {
            use_priority(priority_op);
        }
    }

    // This op was received before draining priority_rx
    if let Some(event) = regular {
        use_op(event);
    }
}

这意味着,如果常规信道为空,则将尽快使用优先信道,并且如果优先信道为空,则常规信道可以延迟多达some_duration(加上两者的循环开销)。如果你想经常检查队列,你可以使用try_recv代替recv_timeout,并包含一个spin_loop提示。
写完后,我意识到它可以被压缩。

loop {
    let regular = rx.try_recv();

    if let Ok(priority_op) = priority_rx.recv_timeout(some_duration) {
        use_priority(priority_op);
        for priority_op in priority_rx.try_iter() {
            use_priority(priority_op);
        }
    }

    // This op was received before draining priority_rx
    if let Ok(event) = regular {
        use_op(event);
    }
}

旋转的版本。

loop {
    let regular = rx.try_recv();

    for priority_op in priority_rx.try_iter() {
        use_priority(priority_op);
    }

    // This op was received before draining priority_rx
    if let Ok(event) = regular {
        use_op(event);
    }

    spin_loop();
}

在这一点上,它的转变相当多,从你原来的,但我认为它应该做什么,你正在寻找。它还忽略了挂起的频道,所以你可能应该把它加进去。
可能有一种数据结构更适合这个特定的任务,但我不知道,它不在标准库中,您可能必须从头开始构建它。希望有人能给出一个答案,如果它确实存在。

相关问题