# 底层探秘: Future执行器与任务调度 异步编程背后到底藏有什么秘密?究竟是哪只幕后之手在操纵这一切?如果你对这些感兴趣,就继续看下去,否则可以直接跳过,因为本章节的内容对于一个API工程师并没有太多帮助。 但是如果你希望能深入理解 `Rust` 的 `async/.await` 代码是如何工作、理解运行时和性能,甚至未来想要构建自己的 `async` 运行时或相关工具,那么本章节终究不会辜负于你。 ## Future 特征 `Future` 特征是 Rust 异步编程的核心,毕竟异步函数是异步编程的核心,而 `Future` 恰恰是异步函数的返回值和被执行的关键。 首先,来给出 `Future` 的定义:它是一个能产出值的异步计算(虽然该值可能为空,例如 `()` )。光看这个定义,可能会觉得很空洞,我们来看看一个简化版的 `Future` 特征: ```rust trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll; } enum Poll { Ready(T), Pending, } ``` 在上一章中,我们提到过 `Future` 需要被执行器`poll`(轮询)后才能运行,诺,这里 `poll` 就来了,通过调用该方法,可以推进 `Future` 的进一步执行,直到被切走为止( 这里不好理解,但是你只需要知道 `Future` 并不能保证在一次 `poll` 中就被执行完,后面会详解介绍)。 若在当前 `poll` 中, `Future` 可以被完成,则会返回 `Poll::Ready(result)` ,反之则返回 `Poll::Pending`, 并且安排一个 `wake` 函数:当未来 `Future` 准备好进一步执行时, 该函数会被调用,然后管理该 `Future` 的执行器(例如上一章节中的`block_on`函数)会再次调用 `poll` 方法,此时 `Future` 就可以继续执行了。 如果没有 `wake `方法,那执行器无法知道某个`Future`是否可以继续被执行,除非执行器定期的轮询每一个 `Future` ,确认它是否能被执行,但这种作法效率较低。而有了 `wake`,`Future` 就可以主动通知执行器,然后执行器就可以精确的执行该 `Future`。 这种“事件通知 -> 执行”的方式要远比定期对所有 `Future` 进行一次全遍历来的高效。 也许大家还是迷迷糊糊的,没事,我们用一个例子来说明下。考虑一个需要从 `socket` 读取数据的场景:如果有数据,可以直接读取数据并返回 `Poll::Ready(data)`, 但如果没有数据,`Future` 会被阻塞且不会再继续执行,此时它会注册一个 `wake` 函数,当 `socket` 数据准备好时,该函数将被调用以通知执行器:我们的 `Future` 已经准备好了,可以继续执行。 下面的 `SocketRead` 结构体就是一个 `Future`: ```rust pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec; fn poll(&mut self, wake: fn()) -> Poll { if self.socket.has_data_to_read() { // socket有数据,写入buffer中并返回 Poll::Ready(self.socket.read_buf()) } else { // socket中还没数据 // // 注册一个`wake`函数,当数据可用时,该函数会被调用, // 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据 self.socket.set_readable_callback(wake); Poll::Pending } } } ``` 这种 `Future` 模型允许将多个异步操作组合在一起,同时还无需任何内存分配。不仅仅如此,如果你需要同时运行多个 `Future`或链式调用多个 `Future` ,也可以通过无内存分配的状态机实现,例如: ```rust trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll; } enum Poll { Ready(T), Pending, } /// 一个SimpleFuture,它会并发地运行两个Future直到它们完成 /// /// 之所以可以并发,是因为两个Future的轮询可以交替进行,一个阻塞,另一个就可以立刻执行,反之亦然 pub struct Join { // 结构体的每个字段都包含一个Future,可以运行直到完成. // 如果Future完成后,字段会被设置为 `None`. 这样Future完成后,就不会再被轮询 a: Option, b: Option, } impl SimpleFuture for Join where FutureA: SimpleFuture, FutureB: SimpleFuture, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll { // 尝试去完成一个 Future `a` if let Some(a) = &mut self.a { if let Poll::Ready(()) = a.poll(wake) { self.a.take(); } } // 尝试去完成一个 Future `b` if let Some(b) = &mut self.b { if let Poll::Ready(()) = b.poll(wake) { self.b.take(); } } if self.a.is_none() && self.b.is_none() { // 两个 Future都已完成 - 我们可以成功地返回了 Poll::Ready(()) } else { // 至少还有一个 Future 没有完成任务,因此返回 `Poll::Pending`. // 当该 Future 再次准备好时,通过调用`wake()`函数来继续执行 Poll::Pending } } } ``` 上面代码展示了如何同时运行多个 `Future`, 且在此过程中没有任何内存分配,让并发编程更加高效。 类似的,多个`Future`也可以一个接一个的连续运行: ```rust /// 一个SimpleFuture, 它使用顺序的方式,一个接一个地运行两个Future // // 注意: 由于本例子用于演示,因此功能简单,`AndThenFut` 会假设两个 Future 在创建时就可用了. // 而真实的`Andthen`允许根据第一个`Future`的输出来创建第二个`Future`,因此复杂的多。 pub struct AndThenFut { first: Option, second: FutureB, } impl SimpleFuture for AndThenFut where FutureA: SimpleFuture, FutureB: SimpleFuture, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll { if let Some(first) = &mut self.first { match first.poll(wake) { // 我们已经完成了第一个 Future, 可以将它移除, 然后准备开始运行第二个 Poll::Ready(()) => self.first.take(), // 第一个 Future 还不能完成 Poll::Pending => return Poll::Pending, }; } // 运行到这里,说明第一个Future已经完成,尝试去完成第二个 self.second.poll(wake) } } ``` 这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下,该如何使用 `Future` 特征去表达异步控制流。 在了解了基础的控制流后,我们再来看看真实的 `Future` 特征有何不同之处。 ```rust trait Future { type Output; fn poll( // 首先值得注意的地方是,`self`的类型从`&mut self`变成了`Pin<&mut Self>`: self: Pin<&mut Self>, // 其次将`wake: fn()` 修改为 `cx: &mut Context<'_>`: cx: &mut Context<'_>, ) -> Poll; } ``` 首先这里多了一个 `Pin` ,关于它我们会在后面章节详细介绍,现在你只需要知道使用它可以创建一个无法被移动的 `Future` ,因为无法被移动,因此它将具有固定的内存地址,意味着我们可以存储它的指针(如果内存地址可能会变动,那存储指针地址将毫无意义!),也意味着可以实现一个自引用数据结构: `struct MyFut { a: i32, ptr_to_a: *const i32 }`。 而对于 `async/await` 来说,`Pin` 是不可或缺的关键特性。 其次,从 `wake: fn()` 变成了 `&mut Context<'_>` 。意味着 `wake` 函数可以携带数据了,为何要携带数据?考虑一个真实世界的场景,一个复杂应用例如web服务器可能有数千连接同时在线,那么同时就有数千 `Future` 在被同时管理着,如果不能携带数据,当一个 `Future` 调用 `wake` 后,执行器该如何知道是哪个 `Future` 调用了 `wake` ,然后进一步去 `poll` 对应的 `Future` ?没有办法!那之前的例子为啥就可以使用没有携带数据的 `wake` ? 因为足够简单,不存在歧义性。 总之,在正式场景要进行 `wake` ,就必须携带上数据。 而 `Context` 类型通过提供一个 `Waker` 类型的值,就可以用来唤醒特定的的任务。 ## 使用 Waker 来唤醒任务 对于 `Future` 来说,第一次被 `poll` 时无法完成任务是很正常的。但它需要确保在未来一旦准备好时,可以通知执行器再次对其进行 `poll` 进而继续往下执行,该通知就是通过 `Waker` 类型完成的。 `Waker` 提供了一个 `wake()` 方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的 `Future` 再次进行 `poll` 操作。 #### 构建一个定时器 下面一起来实现一个简单的定时器 `Future` 。为了让例子尽量简单,当计时器创建时,我们会启动一个线程接着让该线程进入睡眠,等睡眠结束后再通知给 `Future` 。 注意本例子还会在后面继续使用,因此我们重新创建一个工程来演示:使用 `cargo new --lib timer_future` 来创建一个新工程,在 `lib` 包的根路径 `src/lib.rs` 中添加以下内容: ```rust use std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }; ``` 继续来实现 `Future` 定时器,之前提到: 新建线程在睡眠结束后会需要将状态同步给定时器 `Future` ,由于是多线程环境,我们需要使用 `Arc>` 来作为一个共享状态,用于在新线程和 `Future` 定时器间共享。 ```rust pub struct TimerFuture { shared_state: Arc>, } /// 在Future和等待的线程间共享状态 struct SharedState { /// 定时(睡眠)是否结束 completed: bool, /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务 waker: Option, } ``` 下面给出 `Future` 的具体实现: ```rust impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // 通过检查共享状态,来确定定时器是否已经完成 let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { Poll::Ready(()) } else { // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作, // // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。 // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次, // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务 shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } ``` 代码很简单,只要新线程设置了 `shared_state.completed = true` ,那任务就能顺利结束。如果没有设置,会为当前的任务克隆一份 `Waker` ,这样新线程就可以使用它来唤醒当前的任务。 最后,再来创建一个 API 用于构建定时器和启动计时线程: ```rust impl TimerFuture { /// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成 pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // 创建新线程 let thread_shared_state = shared_state.clone(); thread::spawn(move || { // 睡眠指定时间实现计时功能 thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了 shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); TimerFuture { shared_state } } } ``` 至此,一个简单的定时器 `Future` 就已创建成功,那么该如何使用它呢?相信部分爱动脑筋的读者已经猜到了:我们需要创建一个执行器,才能让程序动起来。 ## 执行器Executor Rust的 `Future` 是惰性的:只有屁股上拍一拍,它才会努力动一动。其中一个推动它的方式就是在 `async` 函数中使用 `.await` 来调用另一个 `async` 函数,但是这个只能解决 `async` 内部的问题,那么这些最外层的 `async` 函数,谁来推动它们运行呢?答案就是我们之前多次提到的执行器 `executor` 。 执行器会管理一批 `Future` (最外层的 `ascyn` 函数),然后通过不停地 `poll` 推动它们直到完成。 最开始,执行器会先 `poll` 一次 `Future` ,后面就不会主动去 `poll` 了,而是等待 `Future` 通过调用 `wake` 函数来通知它可以继续,它才会继续去 `poll` 。这种**wake 通知然后 poll**的方式会不断重复,直到 `Future` 完成。 #### 构建执行器 下面我们将实现一个简单的执行器,它可以同时并发运行多个 `Future` 。例子中,需要用到 `futures` 包的 `ArcWake` 特征,它可以提供一个方便的途径去构建一个 `Waker` 。编辑 `Cargo.toml` ,添加下面依赖: ```rust [dependencies] futures = "0.3" ``` 在之前的内容中,我们在 `src/lib.rs` 中创建了定时器 `Future` ,现在在 `src/main.rs` 中来创建程序的主体内容,开始之前,先引入所需的包: ```rust use { futures::{ future::{BoxFuture, FutureExt}, task::{waker_ref, ArcWake}, }, std::{ future::Future, sync::mpsc::{sync_channel, Receiver, SyncSender}, sync::{Arc, Mutex}, task::{Context, Poll}, time::Duration, }, // 引入之前实现的定时器模块 timer_future::TimerFuture, }; ``` 执行器需要从一个消息通道( `channel` )中拉取事件,然后运行它们。当一个任务准备好后(可以继续执行),它会将自己放入消息通道中,然后等待执行器 `poll` 。 ```rust /// 任务执行器,负责从通道中接收任务然后执行 struct Executor { ready_queue: Receiver>, } /// `Spawner`负责创建新的`Future`然后将它发送到任务通道中 #[derive(Clone)] struct Spawner { task_sender: SyncSender>, } /// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll` struct Task { /// 进行中的Future,在未来的某个时间点会被完成 /// /// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于 /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此 /// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。 /// /// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell` future: Mutex>>, /// 可以将该任务自身放回到任务通道中,等待执行器的poll task_sender: SyncSender>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // 任务通道允许的最大缓冲数(任务队列的最大长度) // 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用 const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender }) } ``` 下面再来添加一个方法用于生成 `Future` , 然后将它放入任务通道中: ```rust impl Spawner { fn spawn(&self, future: impl Future + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("任务队列已满"); } } ``` 在执行器 `poll` 一个 `Future` 之前,首先需要调用 `wake` 方法进行唤醒,然后再由 `Waker` 负责调度该任务并将其放入任务通道中。创建 `Waker` 的最简单的方式就是实现 `ArcWake` 特征,先来为我们的任务实现 `ArcWake` 特征,这样它们就能被转变成 `Waker` 然后被唤醒: ```rust impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc) { // 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll` let cloned = arc_self.clone(); arc_self .task_sender .send(cloned) .expect("任务队列已满"); } } ``` 当任务实现了 `ArcWake` 特征后,它就变成了 `Waker` ,在调用 `wake()` 对其唤醒后会将任务复制一份所有权( `Arc` ),然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务,然后进行 `poll` 执行: ```rust impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它 let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // 基于任务自身创建一个 `LocalWaker` let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture`是`Pin + Send + 'static>>`的类型别名 // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>` if future.as_mut().poll(context).is_pending() { // Future还没执行完,因此将它放回任务中,等待下次被poll *future_slot = Some(future); } } } } } ``` 恭喜!我们终于拥有了自己的执行器,下面再来写一段代码使用该执行器去运行之前的定时器 `Future` : ```rust fn main() { let (executor, spawner) = new_executor_and_spawner(); // 生成一个任务 spawner.spawn(async { println!("howdy!"); // 创建定时器Future,并等待它完成 TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); }); // drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来 drop(spawner); // 运行执行器直到任务队列为空 // 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印 `done!` executor.run(); } ``` ## 执行器和系统IO 前面我们一起看过一个使用 `Future` 从 `Socket` 中异步读取数据的例子: ```rust pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec; fn poll(&mut self, wake: fn()) -> Poll { if self.socket.has_data_to_read() { // socket有数据,写入buffer中并返回 Poll::Ready(self.socket.read_buf()) } else { // socket中还没数据 // // 注册一个`wake`函数,当数据可用时,该函数会被调用, // 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据 self.socket.set_readable_callback(wake); Poll::Pending } } } ``` 该例子中,`Future` 将从 `Socket` 读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的 `Future` 。当数据准备好后,会调用 `wake()` 函数将该 `Future` 的任务放入任务通道中,等待执行器的 `poll` 。 关于该流程已经反复讲了很多次,相信大家应该非常清楚了。然而该例子中还有一个疑问没有解决: - `set_readable_callback` 方法到底是怎么工作的?怎么才能知道 `socket` 中的数据已经可以被读取了? 关于第二点,其中一个简单粗暴的方法就是使用一个新线程不停的检查 `socket` 中是否有了数据,当有了后,就调用 `wake()` 函数。该方法确实可以满足需求,但是性能着实太低了,需要为每个阻塞的 `Future` 都创建一个单独的线程! 在现实世界中,该问题往往是通过操作系统提供的 `IO` 多路复用机制来完成,例如 `Linux` 中的 **`epoll`**,`FreeBSD` 和 `MacOS` 中的 **`kqueue`** ,`Windows` 中的 **`IOCP`**, `Fuchisa`中的 **`ports`** 等(可以通过 Rust 的跨平台包 `mio` 来使用它们)。借助IO多路复用机制,可以实现一个线程同时阻塞地去等待多个异步IO事件,一旦某个事件完成就立即退出阻塞并返回数据。相关实现类似于以下代码: ```rust struct IoBlocker { /* ... */ } struct Event { // Event的唯一ID,该事件发生后,就会被监听起来 id: usize, // 一组需要等待或者已发生的信号 signals: Signals, } impl IoBlocker { /// 创建需要阻塞等待的异步IO事件的集合 fn new() -> Self { /* ... */ } /// 对指定的IO事件表示兴趣 fn add_io_event_interest( &self, /// 事件所绑定的socket io_object: &IoObject, event: Event, ) { /* ... */ } /// 进入阻塞,直到某个事件出现 fn block(&self) -> Event { /* ... */ } } let mut io_blocker = IoBlocker::new(); io_blocker.add_io_event_interest( &socket_1, Event { id: 1, signals: READABLE }, ); io_blocker.add_io_event_interest( &socket_2, Event { id: 2, signals: READABLE | WRITABLE }, ); let event = io_blocker.block(); // 当socket的数据可以读取时,打印 "Socket 1 is now READABLE" println!("Socket {:?} is now {:?}", event.id, event.signals); ``` 这样,我们只需要一个执行器线程,它会接收IO事件并将其分发到对应的 `Waker` 中,接着后者会唤醒相关的任务,最终通过执行器 `poll` 后,任务可以顺利的继续执行, 这种IO读取流程可以不停的循环,直到 `socket` 关闭。