Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

rust - Spawning tasks with non-static lifetimes with tokio 0.1.x

I have a tokio core whose main task is running a websocket (client). When I receive some messages from the server, I want to execute a new task that will update some data. Below is a minimal failing example:

use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;

struct Client {
    handle: Handle,
    data: usize,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        self.handle.spawn(future::ok(()).and_then(|x| {
            self.data += 1; // error here
            future::ok(())
        }));
    }
}

fn main() {
    let mut runtime = Core::new().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: 0,
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.run(task).unwrap();
}

Which produces this error:

error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
  --> src/main.rs:13:21                                                                                                                                                                
   |                                                                                                                                                                                   
13 |         self.handle.spawn(future::ok(()).and_then(|x| {                                                                                                                           
   |                     ^^^^^                                                                                                                                                         
   |                                                                                                                                                                                   
   = note: type must satisfy the static lifetime      

The problem is that new tasks that are spawned through a handle need to be static. The same issue is described here. Sadly it is unclear to me how I can fix the issue. Even some attempts with and Arc and a Mutex (which really shouldn't be needed for a single-threaded application), I was unsuccessful.

Since developments occur rather quickly in the tokio landscape, I am wondering what the current best solution is. Do you have any suggestions?

edit

The solution by Peter Hall works for the example above. Sadly when I built the failing example I changed tokio reactor, thinking they would be similar. Using tokio::runtime::current_thread

use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        let mut data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

fn main() {
    // let mut runtime = Core::new().unwrap();

    let mut runtime = Builder::new().build().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: Rc::new(Cell::new(1)),
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.block_on(task).unwrap();
}

I obtain:

error[E0277]: `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
--> src/main.rs:17:21                                                         
|                                                                            
17 |         self.handle.spawn(future::ok(()).and_then(move |_x| {              
|                     ^^^^^ `std::rc::Rc<std::cell::Cell<usize>>` cannot be sent between threads safely
|                                                                            
= help: within `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::Cell<usize>>`
= note: required because it appears within the type `[closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]`
= note: required because it appears within the type `futures::future::chain::Chain<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`
= note: required because it appears within the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:17:51: 20:10 data:std::rc::Rc<std::cell::Cell<usize>>]>`

So it does seem like in this case I need an Arc and a Mutex even though the entire code is single-threaded?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

In a single-threaded program, you don't need to use Arc; Rc is sufficient:

use std::{rc::Rc, cell::Cell};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

The point is that you no longer have to worry about the lifetime because each clone of the Rc acts as if it owns the data, rather than accessing it via a reference to self. The inner Cell (or RefCell for non-Copy types) is needed because the Rc can't be dereferenced mutably, since it has been cloned.


The spawn method of tokio::runtime::current_thread::Handle requires that the future is Send, which is what is causing the problem in the update to your question. There is an explanation (of sorts) for why this is the case in this Tokio Github issue.

You can use tokio::runtime::current_thread::spawn instead of the method of Handle, which will always run the future in the current thread, and does not require that the future is Send. You can replace self.handle.spawn in the code above and it will work just fine.

If you need to use the method on Handle then you will also need to resort to Arc and Mutex (or RwLock) in order to satisfy the Send requirement:

use std::sync::{Mutex, Arc};

struct Client {
    handle: Handle,
    data: Arc<Mutex<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Arc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            *data.lock().unwrap() += 1;
            future::ok(())
        }));
    }
}

If your data is really a usize, you could also use AtomicUsize instead of Mutex<usize>, but I personally find it just as unwieldy to work with.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...