From d6d00ebf7a4163f1743056f1fe4d8e055722d169 Mon Sep 17 00:00:00 2001 From: sunface Date: Thu, 20 Jan 2022 17:14:38 +0800 Subject: [PATCH] =?UTF-8?q?add=E5=BC=82=E6=AD=A5=E7=BC=96=E7=A8=8B?= =?UTF-8?q?=E5=BA=95=E5=B1=82=E6=8E=A2=E7=A7=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- book/contents/async/future-excuting.md | 330 ++++++++++++++++++++++++- 1 file changed, 322 insertions(+), 8 deletions(-) diff --git a/book/contents/async/future-excuting.md b/book/contents/async/future-excuting.md index 62787e4a..226c45e4 100644 --- a/book/contents/async/future-excuting.md +++ b/book/contents/async/future-excuting.md @@ -1,7 +1,7 @@ -# 底层探秘: Future执行与任务调度 -在上个章节,我们知道了 `Future` 是异步函数返回的值,它需要执行器( `executor` )才能进行运行,而本章节,我们一起来看看它背后的奥秘。 +# 底层探秘: Future执行器与任务调度 +异步编程背后到底藏有什么秘密?究竟是哪只幕后之手在操纵这一切?如果你对这些感兴趣,就继续看下去,否则可以直接跳过,因为本章节的内容对于一个API工程师并没有太多帮助。 -如果大家只对如何使用感兴趣,对 `Future` 背后是如何运行的不感兴趣,可以直接跳到下个章节,但是如果你希望能深入理解 Rust 的 `async/.await` 代码是如何工作、理解运行时和性能,甚至未来想要构建自己的 `async` 运行时或相关工具,那么本章节终究不会辜负于你。 +但是如果你希望能深入理解 `Rust` 的 `async/.await` 代码是如何工作、理解运行时和性能,甚至未来想要构建自己的 `async` 运行时或相关工具,那么本章节终究不会辜负于你。 ## Future 特征 `Future` 特征是 Rust 异步编程的核心,毕竟异步函数是异步编程的核心,而 `Future` 恰恰是异步函数的返回值和被执行的关键。 @@ -140,7 +140,7 @@ where } ``` -这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下,该如何使用`Future`特征去表达异步控制流。 在了解了基础的控制流后,我们再来看看真实的`Future`特征有何不同之处。 +这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下,该如何使用 `Future` 特征去表达异步控制流。 在了解了基础的控制流后,我们再来看看真实的 `Future` 特征有何不同之处。 ```rust trait Future { type Output; @@ -153,11 +153,325 @@ trait Future { } ``` -首先这里多了一个`Pin`,关于它我们会在后面章节详细介绍,现在你只需要知道使用它可以创建一个无法被移动的`Future`,因为无法被移动,因此它将具有固定的内存地址,意味着我们可以存储它的指针(如果内存地址可能会变动,那存储指针地址将毫无意义!),也意味着可以实现一个自引用数据结构: `struct MyFut { a: i32, ptr_to_a: *const i32 }`。 而对于`async/await`来说,`Pin`是不可或缺的关键特性。 +首先这里多了一个 `Pin` ,关于它我们会在后面章节详细介绍,现在你只需要知道使用它可以创建一个无法被移动的 `Future` ,因为无法被移动,因此它将具有固定的内存地址,意味着我们可以存储它的指针(如果内存地址可能会变动,那存储指针地址将毫无意义!),也意味着可以实现一个自引用数据结构: `struct MyFut { a: i32, ptr_to_a: *const i32 }`。 而对于 `async/await` 来说,`Pin` 是不可或缺的关键特性。 -其次,从`wake: fn()`变成了`&mut Context<'_>`。意味着`wake`函数可以携带数据了,为何要携带数据?考虑一个真实世界的场景,一个复杂应用例如web服务器可能有数千连接同时在线,那么同时就有数千`Future`在被同时管理着,如果不能携带数据,当一个`Future`调用`wake`后,执行器该如何知道是哪个`Future`调用了`wake`,然后进一步去`poll`对应的`Future`?没有办法!那之前的例子为啥就可以使用没有携带数据的`wake`? 因为足够简单,不存在歧义性。 +其次,从 `wake: fn()` 变成了 `&mut Context<'_>` 。意味着 `wake` 函数可以携带数据了,为何要携带数据?考虑一个真实世界的场景,一个复杂应用例如web服务器可能有数千连接同时在线,那么同时就有数千 `Future` 在被同时管理着,如果不能携带数据,当一个 `Future` 调用 `wake` 后,执行器该如何知道是哪个 `Future` 调用了 `wake` ,然后进一步去 `poll` 对应的 `Future` ?没有办法!那之前的例子为啥就可以使用没有携带数据的 `wake` ? 因为足够简单,不存在歧义性。 -总之,在正式场景要进行`wake`,就必须携带上数据。 而`Context`类型通过提供一个`Waker`类型的值,就可以用来唤醒特定的的任务。 +总之,在正式场景要进行 `wake` ,就必须携带上数据。 而 `Context` 类型通过提供一个 `Waker` 类型的值,就可以用来唤醒特定的的任务。 -## 使用 Waker 来唤醒任务 \ No newline at end of file +## 使用 Waker 来唤醒任务 +对于 `Future` 来说,第一次被 `poll` 时无法完成任务是很正常的。但它需要确保在未来一旦准备好时,可以通知执行器再次对其进行 `poll` 进而继续往下执行,该通知就是通过 `Waker` 类型完成的。 + +`Waker` 提供了一个 `wake()` 方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的 `Future` 再次进行 `poll` 操作。 + +#### 构建一个定时器 +下面一起来实现一个简单的定时器 `Future` 。为了让例子尽量简单,当计时器创建时,我们会启动一个线程接着让该线程进入睡眠,等睡眠结束后再通知给 `Future` 。 + +注意本例子还会在后面继续使用,因此我们重新创建一个工程来演示:使用 `cargo new --lib timer_future` 来创建一个新工程,在 `lib` 包的根路径 `src/lib.rs` 中添加以下内容: +```rust +use std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + thread, + time::Duration, +}; +``` + +继续来实现 `Future` 定时器,之前提到: 新建线程在睡眠结束后会需要将状态同步给定时器 `Future` ,由于是多线程环境,我们需要使用 `Arc>` 来作为一个共享状态,用于在新线程和 `Future` 定时器间共享。 + +```rust +pub struct TimerFuture { + shared_state: Arc>, +} + +/// 在Future和等待的线程间共享状态 +struct SharedState { + /// 定时(睡眠)是否结束 + completed: bool, + + /// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务 + waker: Option, +} +``` + +下面给出 `Future` 的具体实现: +```rust +impl Future for TimerFuture { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // 通过检查共享状态,来确定定时器是否已经完成 + let mut shared_state = self.shared_state.lock().unwrap(); + if shared_state.completed { + Poll::Ready(()) + } else { + // 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作, + // + // 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。 + // 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次, + // 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务 + shared_state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} +``` + +代码很简单,只要新线程设置了 `shared_state.completed = true` ,那任务就能顺利结束。如果没有设置,会为当前的任务克隆一份 `Waker` ,这样新线程就可以使用它来唤醒当前的任务。 + +最后,再来创建一个 API 用于构建定时器和启动计时线程: +```rust +impl TimerFuture { + /// 创建一个新的`TimeFuture`,在指定的时间结束后,该`Future`可以完成 + pub fn new(duration: Duration) -> Self { + let shared_state = Arc::new(Mutex::new(SharedState { + completed: false, + waker: None, + })); + + // 创建新线程 + let thread_shared_state = shared_state.clone(); + thread::spawn(move || { + // 睡眠指定时间实现计时功能 + thread::sleep(duration); + let mut shared_state = thread_shared_state.lock().unwrap(); + // 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了 + shared_state.completed = true; + if let Some(waker) = shared_state.waker.take() { + waker.wake() + } + }); + + TimerFuture { shared_state } + } +} +``` + +至此,一个简单的定时器 `Future` 就已创建成功,那么该如何使用它呢?相信部分爱动脑筋的读者已经猜到了:我们需要创建一个执行器,才能让程序动起来。 + +## 执行器Executor +Rust的 `Future` 是惰性的:只有屁股上拍一拍,它才会努力动一动。其中一个推动它的方式就是在 `async` 函数中使用 `.await` 来调用另一个 `async` 函数,但是这个只能解决 `async` 内部的问题,那么这些最外层的 `async` 函数,谁来推动它们运行呢?答案就是我们之前多次提到的执行器 `executor` 。 + +执行器会管理一批 `Future` (最外层的 `ascyn` 函数),然后通过不停地 `poll` 推动它们直到完成。 最开始,执行器会先 `poll` 一次 `Future` ,后面就不会主动去 `poll` 了,而是等待 `Future` 通过调用 `wake` 函数来通知它可以继续,它才会继续去 `poll` 。这种**wake 通知然后 poll**的方式会不断重复,直到 `Future` 完成。 + +#### 构建执行器 +下面我们将实现一个简单的执行器,它可以同时并发运行多个 `Future` 。例子中,需要用到 `futures` 包的 `ArcWake` 特征,它可以提供一个方便的途径去构建一个 `Waker` 。编辑 `Cargo.tom` ,添加下面依赖: +```rust +[dependencies] +futures = "0.3" +``` + +在之前的内容中,我们在 `src/lib.rs` 中创建了定时器 `Future` ,现在在 `src/main.js` 中来创建程序的主体内容,开始之前,先引入所需的包: +```rust +use { + futures::{ + future::{BoxFuture, FutureExt}, + task::{waker_ref, ArcWake}, + }, + std::{ + future::Future, + sync::mpsc::{sync_channel, Receiver, SyncSender}, + sync::{Arc, Mutex}, + task::{Context, Poll}, + time::Duration, + }, + // 引入之前实现的定时器模块 + timer_future::TimerFuture, +}; +``` + +执行器需要从一个消息通道( `channel` )中拉取事件,然后运行它们。当一个任务准备好后(可以继续执行),它会将自己放入消息通道中,然后等待执行器 `poll` 。 + +```rust +/// 任务执行器,负责从通道中接收任务然后执行 +struct Executor { + ready_queue: Receiver>, +} + +/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中 +#[derive(Clone)] +struct Spawner { + task_sender: SyncSender>, +} + +/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll` +struct Task { + /// 进行中的Future,在未来的某个时间点会被完成 + /// + /// 按理来说`Mutext`在这里是多余的,因为我们只有一个线程来执行任务。但是由于 + /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被夸线程修改。因此 + /// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。 + /// + /// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell` + future: Mutex>>, + + /// 可以将该任务自身放回到任务通道中,等待执行器的poll + task_sender: SyncSender>, +} + +fn new_executor_and_spawner() -> (Executor, Spawner) { + // 任务通道允许的最大缓冲数(任务队列的最大长度) + // 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用 + const MAX_QUEUED_TASKS: usize = 10_000; + let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); + (Executor { ready_queue }, Spawner { task_sender }) +} +``` + +下面再来添加一个方法用于生成 `Future` , 然后将它放入任务通道中: +```rust +impl Spawner { + fn spawn(&self, future: impl Future + 'static + Send) { + let future = future.boxed(); + let task = Arc::new(Task { + future: Mutex::new(Some(future)), + task_sender: self.task_sender.clone(), + }); + self.task_sender.send(task).expect("任务队列已满"); + } +} +``` + +在执行器 `poll` 一个 `Future` 之前,首先需要调用 `wake` 方法进行唤醒,然后再由 `Waker` 负责调度该任务并将其放入任务通道中。创建 `Waker` 的最简单的方式就是实现 `ArcWake` 特征,先来为我们的任务实现 `ArcWake` 特征,这样它们就能被转变成 `Waker` 然后被唤醒: +```rust +impl ArcWake for Task { + fn wake_by_ref(arc_self: &Arc) { + // 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll` + let cloned = arc_self.clone(); + arc_self + .task_sender + .send(cloned) + .expect("任务队列已满"); + } +} +``` + +当任务实现了 `ArcWake` 特征后,它就变成了 `Waker` ,在调用 `wake()` 对其唤醒后会将任务复制一份所有权( `Arc` ),然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务,然后进行 `poll` 执行: +```rust +impl Executor { + fn run(&self) { + while let Ok(task) = self.ready_queue.recv() { + // 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它 + let mut future_slot = task.future.lock().unwrap(); + if let Some(mut future) = future_slot.take() { + // 基于任务自身创建一个 `LocalWaker` + let waker = waker_ref(&task); + let context = &mut Context::from_waker(&*waker); + // `BoxFuture`是`Pin + Send + 'static>>`的类型别名 + // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>` + if future.as_mut().poll(context).is_pending() { + // Future还没执行完,因此将它放回任务中,等待下次被poll + *future_slot = Some(future); + } + } + } + } +} +``` + +恭喜!我们终于拥有了自己的执行器,下面再来写一段代码使用该执行器去运行之前的定时器 `Future` : +```rust +fn main() { + let (executor, spawner) = new_executor_and_spawner(); + + // 生成一个任务 + spawner.spawn(async { + println!("howdy!"); + // 创建定时器Future,并等待它完成 + TimerFuture::new(Duration::new(2, 0)).await; + println!("done!"); + }); + + // drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来 + drop(spawner); + + // 运行执行器直到任务队列为空 + // 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印 `done!` + executor.run(); +} +``` + +## 执行器和系统IO +前面我们一起看过一个使用 `Future` 从 `Socket` 中异步读取数据的例子: +```rust +pub struct SocketRead<'a> { + socket: &'a Socket, +} + +impl SimpleFuture for SocketRead<'_> { + type Output = Vec; + + fn poll(&mut self, wake: fn()) -> Poll { + if self.socket.has_data_to_read() { + // socket有数据,写入buffer中并返回 + Poll::Ready(self.socket.read_buf()) + } else { + // socket中还没数据 + // + // 注册一个`wake`函数,当数据可用时,该函数会被调用, + // 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据 + self.socket.set_readable_callback(wake); + Poll::Pending + } + } +} +``` + +该例子中,`Future` 将从 `Socket` 读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的 `Future` 。当数据准备好后,会调用 `wake()` 函数将该 `Future` 的任务放入任务通道中,等待执行器的 `poll` 。 + +关于该流程已经反复讲了很多次,相信大家应该非常清楚了。然而该例子中还有一个疑问没有解决: + +- `set_readable_callback` 方法到底是怎么工作的?怎么才能知道 `socket` 中的数据已经可以被读取了? + +关于第二点,其中一个简单粗暴的方法就是使用一个新线程不停的检查 `socket` 中是否有了数据,当有了后,就调用 `wake()` 函数。该方法确实可以满足需求,但是性能着实太低了,需要为每个阻塞的 `Future` 都创建一个单独的线程! + +在现实世界中,该问题往往是通过操作系统提供的 `IO` 服务来完成,例如 `linux` 、`FreeBSD` 和 `Macos` 中的 **`epoll`** ,`Windows` 中的 **`IOCP`**, `Fuchisa`中的 **`ports`** 等(可以通过 Rust 的跨平台包 `mio` 来使用它们)。使用它们,允许一个线程同时阻塞地去等待多个异步IO事件,一旦某个事件完成就立即退出阻塞并返回数据。相关实现类似于以下代码: +```rust +struct IoBlocker { + /* ... */ +} + +struct Event { + // Event的唯一ID,该事件发生后,就会被监听起来 + id: usize, + + // 一组需要等待或者已发生的信号 + signals: Signals, +} + +impl IoBlocker { + /// 创建异步IO事件的集合,这些事件是阻塞等待的 + fn new() -> Self { /* ... */ } + + /// 对指定的IO事件表示兴趣 + fn add_io_event_interest( + &self, + + /// 事件所绑定的socket + io_object: &IoObject, + + event: Event, + ) { /* ... */ } + + /// 进入阻塞,直到某个事件出现 + fn block(&self) -> Event { /* ... */ } +} + +let mut io_blocker = IoBlocker::new(); +io_blocker.add_io_event_interest( + &socket_1, + Event { id: 1, signals: READABLE }, +); +io_blocker.add_io_event_interest( + &socket_2, + Event { id: 2, signals: READABLE | WRITABLE }, +); +let event = io_blocker.block(); + +// 当socket的数据可以读取时,打印 "Socket 1 is now READABLE" +println!("Socket {:?} is now {:?}", event.id, event.signals); +``` + +这样,我们只需要一个执行器线程,它会接收IO事件并将其分发到对应的 `Waker` 中,接着后者会唤醒相关的任务,最终通过执行器 `poll` 后,任务可以顺利的继续执行, 这种IO读取流程可以不停的循环,直到 `socket` 关闭。 \ No newline at end of file