add异步编程底层探秘

pull/271/head
sunface 3 years ago
parent 1432a1bba1
commit d6d00ebf7a

@ -1,7 +1,7 @@
# 底层探秘: Future执行与任务调度
在上个章节,我们知道了 `Future` 是异步函数返回的值,它需要执行器( `executor` )才能进行运行,而本章节,我们一起来看看它背后的奥秘
# 底层探秘: Future执行与任务调度
异步编程背后到底藏有什么秘密究竟是哪只幕后之手在操纵这一切如果你对这些感兴趣就继续看下去否则可以直接跳过因为本章节的内容对于一个API工程师并没有太多帮助
如果大家只对如何使用感兴趣,对 `Future` 背后是如何运行的不感兴趣,可以直接跳到下个章节,但是如果你希望能深入理解 Rust 的 `async/.await` 代码是如何工作、理解运行时和性能,甚至未来想要构建自己的 `async` 运行时或相关工具,那么本章节终究不会辜负于你。
但是如果你希望能深入理解 `Rust``async/.await` 代码是如何工作、理解运行时和性能,甚至未来想要构建自己的 `async` 运行时或相关工具,那么本章节终究不会辜负于你。
## Future 特征
`Future` 特征是 Rust 异步编程的核心,毕竟异步函数是异步编程的核心,而 `Future` 恰恰是异步函数的返回值和被执行的关键。
@ -140,7 +140,7 @@ where
}
```
这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下,该如何使用`Future`特征去表达异步控制流。 在了解了基础的控制流后,我们再来看看真实的`Future`特征有何不同之处。
这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下,该如何使用 `Future` 特征去表达异步控制流。 在了解了基础的控制流后,我们再来看看真实的 `Future` 特征有何不同之处。
```rust
trait Future {
type Output;
@ -153,11 +153,325 @@ trait Future {
}
```
首先这里多了一个`Pin`,关于它我们会在后面章节详细介绍,现在你只需要知道使用它可以创建一个无法被移动的`Future`,因为无法被移动,因此它将具有固定的内存地址,意味着我们可以存储它的指针(如果内存地址可能会变动,那存储指针地址将毫无意义!),也意味着可以实现一个自引用数据结构: `struct MyFut { a: i32, ptr_to_a: *const i32 }`。 而对于`async/await`来说,`Pin`是不可或缺的关键特性。
首先这里多了一个 `Pin` ,关于它我们会在后面章节详细介绍,现在你只需要知道使用它可以创建一个无法被移动的 `Future` ,因为无法被移动,因此它将具有固定的内存地址,意味着我们可以存储它的指针(如果内存地址可能会变动,那存储指针地址将毫无意义!),也意味着可以实现一个自引用数据结构: `struct MyFut { a: i32, ptr_to_a: *const i32 }`。 而对于 `async/await` 来说,`Pin` 是不可或缺的关键特性。
其次,从`wake: fn()`变成了`&mut Context<'_>`。意味着`wake`函数可以携带数据了为何要携带数据考虑一个真实世界的场景一个复杂应用例如web服务器可能有数千连接同时在线那么同时就有数千`Future`在被同时管理着,如果不能携带数据,当一个`Future`调用`wake`后,执行器该如何知道是哪个`Future`调用了`wake`,然后进一步去`poll`对应的`Future`?没有办法!那之前的例子为啥就可以使用没有携带数据的`wake` 因为足够简单,不存在歧义性。
其次,从 `wake: fn()` 变成了 `&mut Context<'_>` 。意味着 `wake` 函数可以携带数据了为何要携带数据考虑一个真实世界的场景一个复杂应用例如web服务器可能有数千连接同时在线那么同时就有数千 `Future` 在被同时管理着,如果不能携带数据,当一个 `Future` 调用 `wake` 后,执行器该如何知道是哪个 `Future` 调用了 `wake` ,然后进一步去 `poll` 对应的 `Future` ?没有办法!那之前的例子为啥就可以使用没有携带数据的 `wake` 因为足够简单,不存在歧义性。
总之,在正式场景要进行`wake`,就必须携带上数据。 而`Context`类型通过提供一个`Waker`类型的值,就可以用来唤醒特定的的任务。
总之,在正式场景要进行 `wake` ,就必须携带上数据。 而 `Context` 类型通过提供一个 `Waker` 类型的值,就可以用来唤醒特定的的任务。
## 使用 Waker 来唤醒任务
## 使用 Waker 来唤醒任务
对于 `Future` 来说,第一次被 `poll` 时无法完成任务是很正常的。但它需要确保在未来一旦准备好时,可以通知执行器再次对其进行 `poll` 进而继续往下执行,该通知就是通过 `Waker` 类型完成的。
`Waker` 提供了一个 `wake()` 方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的 `Future` 再次进行 `poll` 操作。
#### 构建一个定时器
下面一起来实现一个简单的定时器 `Future` 。为了让例子尽量简单,当计时器创建时,我们会启动一个线程接着让该线程进入睡眠,等睡眠结束后再通知给 `Future`
注意本例子还会在后面继续使用,因此我们重新创建一个工程来演示:使用 `cargo new --lib timer_future` 来创建一个新工程,在 `lib` 包的根路径 `src/lib.rs` 中添加以下内容:
```rust
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
```
继续来实现 `Future` 定时器,之前提到: 新建线程在睡眠结束后会需要将状态同步给定时器 `Future` ,由于是多线程环境,我们需要使用 `Arc<Mutex<T>>` 来作为一个共享状态,用于在新线程和 `Future` 定时器间共享。
```rust
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// 在Future和等待的线程间共享状态
struct SharedState {
/// 定时(睡眠)是否结束
completed: bool,
/// 当睡眠结束后,线程可以用`waker`通知`TimerFuture`来唤醒任务
waker: Option<Waker>,
}
```
下面给出 `Future` 的具体实现:
```rust
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 通过检查共享状态,来确定定时器是否已经完成
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
//
// 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
// 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
// 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
```
代码很简单,只要新线程设置了 `shared_state.completed = true` ,那任务就能顺利结束。如果没有设置,会为当前的任务克隆一份 `Waker` ,这样新线程就可以使用它来唤醒当前的任务。
最后,再来创建一个 API 用于构建定时器和启动计时线程:
```rust
impl TimerFuture {
/// 创建一个新的`TimeFuture`,在指定的时间结束后,该`Future`可以完成
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 创建新线程
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
// 睡眠指定时间实现计时功能
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// 通知执行器定时器已经完成,可以继续`poll`对应的`Future`了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
```
至此,一个简单的定时器 `Future` 就已创建成功,那么该如何使用它呢?相信部分爱动脑筋的读者已经猜到了:我们需要创建一个执行器,才能让程序动起来。
## 执行器Executor
Rust的 `Future` 是惰性的:只有屁股上拍一拍,它才会努力动一动。其中一个推动它的方式就是在 `async` 函数中使用 `.await` 来调用另一个 `async` 函数,但是这个只能解决 `async` 内部的问题,那么这些最外层的 `async` 函数,谁来推动它们运行呢?答案就是我们之前多次提到的执行器 `executor`
执行器会管理一批 `Future` (最外层的 `ascyn` 函数),然后通过不停地 `poll` 推动它们直到完成。 最开始,执行器会先 `poll` 一次 `Future` ,后面就不会主动去 `poll` 了,而是等待 `Future` 通过调用 `wake` 函数来通知它可以继续,它才会继续去 `poll` 。这种**wake 通知然后 poll**的方式会不断重复,直到 `Future` 完成。
#### 构建执行器
下面我们将实现一个简单的执行器,它可以同时并发运行多个 `Future` 。例子中,需要用到 `futures` 包的 `ArcWake` 特征,它可以提供一个方便的途径去构建一个 `Waker` 。编辑 `Cargo.tom` ,添加下面依赖:
```rust
[dependencies]
futures = "0.3"
```
在之前的内容中,我们在 `src/lib.rs` 中创建了定时器 `Future` ,现在在 `src/main.js` 中来创建程序的主体内容,开始之前,先引入所需的包:
```rust
use {
futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
},
std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
},
// 引入之前实现的定时器模块
timer_future::TimerFuture,
};
```
执行器需要从一个消息通道( `channel` )中拉取事件,然后运行它们。当一个任务准备好后(可以继续执行),它会将自己放入消息通道中,然后等待执行器 `poll`
```rust
/// 任务执行器,负责从通道中接收任务然后执行
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// 一个Future它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
/// 进行中的Future在未来的某个时间点会被完成
///
/// 按理来说`Mutext`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
/// Rust并不聪明它无法知道`Future`只会在一个线程内被修改,并不会被夸线程修改。因此
/// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
///
/// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// 可以将该任务自身放回到任务通道中等待执行器的poll
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// 任务通道允许的最大缓冲数(任务队列的最大长度)
// 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
```
下面再来添加一个方法用于生成 `Future` , 然后将它放入任务通道中:
```rust
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("任务队列已满");
}
}
```
在执行器 `poll` 一个 `Future` 之前,首先需要调用 `wake` 方法进行唤醒,然后再由 `Waker` 负责调度该任务并将其放入任务通道中。创建 `Waker` 的最简单的方式就是实现 `ArcWake` 特征,先来为我们的任务实现 `ArcWake` 特征,这样它们就能被转变成 `Waker` 然后被唤醒:
```rust
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("任务队列已满");
}
}
```
当任务实现了 `ArcWake` 特征后,它就变成了 `Waker` ,在调用 `wake()` 对其唤醒后会将任务复制一份所有权( `Arc` ),然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务,然后进行 `poll` 执行:
```rust
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// 获取一个future若它还没有完成(仍然是Some不是None)则对它进行一次poll并尝试完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 基于任务自身创建一个 `LocalWaker`
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
// 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
if future.as_mut().poll(context).is_pending() {
// Future还没执行完因此将它放回任务中等待下次被poll
*future_slot = Some(future);
}
}
}
}
}
```
恭喜!我们终于拥有了自己的执行器,下面再来写一段代码使用该执行器去运行之前的定时器 `Future`
```rust
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// 生成一个任务
spawner.spawn(async {
println!("howdy!");
// 创建定时器Future并等待它完成
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// drop掉任务这样执行器就知道任务已经完成不会再有新的任务进来
drop(spawner);
// 运行执行器直到任务队列为空
// 任务运行后,会先打印`howdy!`, 暂停2秒接着打印 `done!`
executor.run();
}
```
## 执行器和系统IO
前面我们一起看过一个使用 `Future``Socket` 中异步读取数据的例子:
```rust
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// socket有数据写入buffer中并返回
Poll::Ready(self.socket.read_buf())
} else {
// socket中还没数据
//
// 注册一个`wake`函数,当数据可用时,该函数会被调用,
// 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
```
该例子中,`Future` 将从 `Socket` 读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的 `Future` 。当数据准备好后,会调用 `wake()` 函数将该 `Future` 的任务放入任务通道中,等待执行器的 `poll`
关于该流程已经反复讲了很多次,相信大家应该非常清楚了。然而该例子中还有一个疑问没有解决:
- `set_readable_callback` 方法到底是怎么工作的?怎么才能知道 `socket` 中的数据已经可以被读取了?
关于第二点,其中一个简单粗暴的方法就是使用一个新线程不停的检查 `socket` 中是否有了数据,当有了后,就调用 `wake()` 函数。该方法确实可以满足需求,但是性能着实太低了,需要为每个阻塞的 `Future` 都创建一个单独的线程!
在现实世界中,该问题往往是通过操作系统提供的 `IO` 服务来完成,例如 `linux` 、`FreeBSD` 和 `Macos` 中的 **`epoll`** `Windows` 中的 **`IOCP`**, `Fuchisa`中的 **`ports`** 等(可以通过 Rust 的跨平台包 `mio` 来使用它们)。使用它们允许一个线程同时阻塞地去等待多个异步IO事件一旦某个事件完成就立即退出阻塞并返回数据。相关实现类似于以下代码
```rust
struct IoBlocker {
/* ... */
}
struct Event {
// Event的唯一ID该事件发生后就会被监听起来
id: usize,
// 一组需要等待或者已发生的信号
signals: Signals,
}
impl IoBlocker {
/// 创建异步IO事件的集合这些事件是阻塞等待的
fn new() -> Self { /* ... */ }
/// 对指定的IO事件表示兴趣
fn add_io_event_interest(
&self,
/// 事件所绑定的socket
io_object: &IoObject,
event: Event,
) { /* ... */ }
/// 进入阻塞,直到某个事件出现
fn block(&self) -> Event { /* ... */ }
}
let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
&socket_1,
Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
&socket_2,
Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();
// 当socket的数据可以读取时打印 "Socket 1 is now READABLE"
println!("Socket {:?} is now {:?}", event.id, event.signals);
```
这样我们只需要一个执行器线程它会接收IO事件并将其分发到对应的 `Waker` 中,接着后者会唤醒相关的任务,最终通过执行器 `poll` 后,任务可以顺利的继续执行, 这种IO读取流程可以不停的循环直到 `socket` 关闭。
Loading…
Cancel
Save