Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
596 views
in Technique[技术] by (71.8m points)

multithreading - Get the first received value from an iterator of channels in rust

I have an iterator of futures::channel::mpsc::UnboundedReceiver<T>. I want to handle every answer of the receiver, only one at a time, but while also handling other futures.

This should be possible by looping over a futures::select!. But I need some way of getting a resolved value from the UnboundReceiver<T>. I've attempted to use futures::future::select_all(Iter), but this fails to compile with the error: futures::channel::mpsc::UnboundedReceiver<T> is not a future.

Playground example is here.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

futures::channel::mpsc::UnboundedReceiver implements Stream but isn't a future, so you can create a SelectAll by calling futures::stream::select_all(recv) and then resolve to the next ready message by calling select_all.next(). I adapted your example by using it:

use futures::{channel::mpsc, stream::{self, StreamExt, select_all}}; // 0.3.8
use tokio; // 1.0.1

#[tokio::main]
async fn main() -> failure::Fallible<()> {
    let mut recv = Vec::new();
    let mut futures = stream::FuturesUnordered::new();
    for _i in 0..3 {
        let (tx, rx) = mpsc::unbounded();
        recv.push(rx);
        futures.push(tokio::spawn(async move {
            tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
            tx.unbounded_send("Message").unwrap();
        }));
    }
    let mut select_all = select_all(recv);
    loop {
        futures::select! {
            msg = select_all.next() => {
                println!("{:#?}", msg);
            }
            _ = futures.select_next_some() => {
                eprintln!("Thread died");
            },
            complete => break
        }
    }
    Ok(())
}

Note that this is not multithreading but asynchronous programming, you spawn aynchronous tokio tasks instead of threads. I recommend reading the answer here: What is the difference between asynchronous programming and multithreading?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...