|
|
@ -1,7 +1,7 @@
|
|
|
|
# 深入 Tokio 背后的异步原理
|
|
|
|
# 深入 Tokio 背后的异步原理
|
|
|
|
在经过多个章节的深入学习后,Tokio 对我们来说不再是一座隐于云雾中的高山,它其实蛮简单好用的,甚至还有一丝丝的可爱!?
|
|
|
|
在经过多个章节的深入学习后,Tokio 对我们来说不再是一座隐于云雾中的高山,它其实蛮简单好用的,甚至还有一丝丝的可爱!?
|
|
|
|
|
|
|
|
|
|
|
|
但从现在开始,如果想要进一步的深入 Tokio ,首先需要深入理解 async 的原理,其实我们在[之前的章节](https://course.rs/async/intro.html)已经深入学习过,这里结合 Tokio 再来回顾下。
|
|
|
|
但从现在开始,如果想要进一步的深入 Tokio ,首先需要深入理解 `async` 的原理,其实我们在[之前的章节](https://course.rs/async/intro.html)已经深入学习过,这里结合 Tokio 再来回顾下。
|
|
|
|
|
|
|
|
|
|
|
|
## Future
|
|
|
|
## Future
|
|
|
|
先来回顾一下 `async fn` 异步函数 :
|
|
|
|
先来回顾一下 `async fn` 异步函数 :
|
|
|
@ -118,7 +118,7 @@ use std::time::{Duration, Instant};
|
|
|
|
enum MainFuture {
|
|
|
|
enum MainFuture {
|
|
|
|
// 初始化,但永远不会被 poll
|
|
|
|
// 初始化,但永远不会被 poll
|
|
|
|
State0,
|
|
|
|
State0,
|
|
|
|
// 等待 `Delay` 运行,例如 `future.awai` 代码行
|
|
|
|
// 等待 `Delay` 运行,例如 `future.await` 代码行
|
|
|
|
State1(Delay),
|
|
|
|
State1(Delay),
|
|
|
|
// Future 执行完成
|
|
|
|
// Future 执行完成
|
|
|
|
Terminated,
|
|
|
|
Terminated,
|
|
|
@ -236,12 +236,12 @@ impl MiniTokio {
|
|
|
|
|
|
|
|
|
|
|
|
鉴于此,我们的 mini-tokio 只应该在 `Future` 准备好可以进一步运行后,才去 `poll` 它,例如该 `Future` 之前阻塞等待的**资源**已经准备好并可以被使用了,就可以对其进行 `poll`。再比如,如果一个 `Future` 任务在阻塞等待从 TCP socket 中读取数据,那我们只想在 `socket` 中有数据可以读取后才去 `poll` 它,而不是没事就 `poll` 着玩。
|
|
|
|
鉴于此,我们的 mini-tokio 只应该在 `Future` 准备好可以进一步运行后,才去 `poll` 它,例如该 `Future` 之前阻塞等待的**资源**已经准备好并可以被使用了,就可以对其进行 `poll`。再比如,如果一个 `Future` 任务在阻塞等待从 TCP socket 中读取数据,那我们只想在 `socket` 中有数据可以读取后才去 `poll` 它,而不是没事就 `poll` 着玩。
|
|
|
|
|
|
|
|
|
|
|
|
回到在上面的代码中,mini-tokio 只应该当任务的延迟时间到了后,才去 `poll` 它。 为了实现这个功能,我们需要通知 -> 运行机制:当任务可以进一步被推进运行时,它会主动通知执行器,然后执行器再来 `poll`。
|
|
|
|
回到在上面的代码中,mini-tokio 只应该当任务的延迟时间到了后,才去 `poll` 它。 为了实现这个功能,我们需要 `通知 -> 运行` 机制:当任务可以进一步被推进运行时,它会主动通知执行器,然后执行器再来 `poll`。
|
|
|
|
|
|
|
|
|
|
|
|
## Waker
|
|
|
|
## Waker
|
|
|
|
一切的答案都在 `Waker` 中,资源可以用它来通知正在等待的任务:该资源已经准备好,可以继续运行了。
|
|
|
|
一切的答案都在 `Waker` 中,资源可以用它来通知正在等待的任务:该资源已经准备好,可以继续运行了。
|
|
|
|
|
|
|
|
|
|
|
|
再来看下 `Future::pol` 的定义:
|
|
|
|
再来看下 `Future::poll` 的定义:
|
|
|
|
```rust
|
|
|
|
```rust
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context)
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context)
|
|
|
|
-> Poll<Self::Output>;
|
|
|
|
-> Poll<Self::Output>;
|
|
|
@ -327,7 +327,7 @@ impl Future for Delay {
|
|
|
|
|
|
|
|
|
|
|
|
在返回 `Poll::Pending` 之前,先调用了 `cx.waker().wake_by_ref()` ,由于此时我们还没有模拟计时资源,因此这里直接调用了 `wake` 进行通知,这样做会导致当前的 `Future` 被立即再次调度执行。
|
|
|
|
在返回 `Poll::Pending` 之前,先调用了 `cx.waker().wake_by_ref()` ,由于此时我们还没有模拟计时资源,因此这里直接调用了 `wake` 进行通知,这样做会导致当前的 `Future` 被立即再次调度执行。
|
|
|
|
|
|
|
|
|
|
|
|
由此可见,这种通知的控制权是在你手里的,甚至可以像上面代码这样,还没准备好资源,就直接进行 `wake` 通知,但是总归意义不大,而且浪费了 CPU,因为这种执行 -> 立即通知再调度 -> 执行的方式会造成一个非常繁忙的循环。
|
|
|
|
由此可见,这种通知的控制权是在你手里的,甚至可以像上面代码这样,还没准备好资源,就直接进行 `wake` 通知,但是总归意义不大,而且浪费了 CPU,因为这种 `执行 -> 立即通知再调度 -> 执行` 的方式会造成一个非常繁忙的循环。
|
|
|
|
|
|
|
|
|
|
|
|
#### 处理 wake 通知
|
|
|
|
#### 处理 wake 通知
|
|
|
|
下面,让我们更新 mint-tokio 服务,让它能接受 wake 通知:当 `waker.wake()` 被调用后,相关联的任务会被放入执行器的队列中,然后等待执行器的调用执行。
|
|
|
|
下面,让我们更新 mint-tokio 服务,让它能接受 wake 通知:当 `waker.wake()` 被调用后,相关联的任务会被放入执行器的队列中,然后等待执行器的调用执行。
|
|
|
@ -423,7 +423,7 @@ impl MiniTokio {
|
|
|
|
|
|
|
|
|
|
|
|
impl Task {
|
|
|
|
impl Task {
|
|
|
|
fn poll(self: Arc<Self>) {
|
|
|
|
fn poll(self: Arc<Self>) {
|
|
|
|
// 基于 Task 实例创建一个 waker, 它使用了之前的 `AarcWake`
|
|
|
|
// 基于 Task 实例创建一个 waker, 它使用了之前的 `ArcWake`
|
|
|
|
let waker = task::waker(self.clone());
|
|
|
|
let waker = task::waker(self.clone());
|
|
|
|
let mut cx = Context::from_waker(&waker);
|
|
|
|
let mut cx = Context::from_waker(&waker);
|
|
|
|
|
|
|
|
|
|
|
@ -436,7 +436,7 @@ impl Task {
|
|
|
|
|
|
|
|
|
|
|
|
// 使用给定的 future 来生成新的任务
|
|
|
|
// 使用给定的 future 来生成新的任务
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// 新的任务会被推到 `sender` 中,接着该消息通道的接收段就可以获取该任务,然后执行
|
|
|
|
// 新的任务会被推到 `sender` 中,接着该消息通道的接收端就可以获取该任务,然后执行
|
|
|
|
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
|
|
|
|
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
|
|
|
|
where
|
|
|
|
where
|
|
|
|
F: Future<Output = ()> + Send + 'static,
|
|
|
|
F: Future<Output = ()> + Send + 'static,
|
|
|
@ -492,14 +492,14 @@ async fn main() {
|
|
|
|
|
|
|
|
|
|
|
|
首先,`poll_fn` 函数使用闭包创建了一个 `Future`,其次,上面代码还创建一个 `Delay` 实例,然后在闭包中,对其进行了一次 `poll` ,接着再将该 `Delay` 实例发送到一个新的任务,在此任务中使用 `.await` 进行了执行。
|
|
|
|
首先,`poll_fn` 函数使用闭包创建了一个 `Future`,其次,上面代码还创建一个 `Delay` 实例,然后在闭包中,对其进行了一次 `poll` ,接着再将该 `Delay` 实例发送到一个新的任务,在此任务中使用 `.await` 进行了执行。
|
|
|
|
|
|
|
|
|
|
|
|
在例子中,`Delay:poll` 被调用了不止一次,且使用了不同的 `Waker` 实例,在这种场景下,你必须确保调用最近一次 `poll` 函数中的 `Waker` 参数中的`wake`方法。也就是调用最内层 `poll` 函数参数( `Waker` )上的 `wake` 方。
|
|
|
|
在例子中,`Delay:poll` 被调用了不止一次,且使用了不同的 `Waker` 实例,在这种场景下,你必须确保调用最近一次 `poll` 函数中的 `Waker` 参数中的`wake`方法。也就是调用最内层 `poll` 函数参数( `Waker` )上的 `wake` 方法。
|
|
|
|
|
|
|
|
|
|
|
|
当实现一个 `Future` 时,很关键的一点就是要假设每次 `poll` 调用都会应用到一个不同的 `Waker` 实例上。因此 `poll` 函数必须要使用一个新的 `waker` 去更新替代之前的 `waker`。
|
|
|
|
当实现一个 `Future` 时,很关键的一点就是要假设每次 `poll` 调用都会应用到一个不同的 `Waker` 实例上。因此 `poll` 函数必须要使用一个新的 `waker` 去更新替代之前的 `waker`。
|
|
|
|
|
|
|
|
|
|
|
|
我们之前的 `Delay` 实现中,会在每一次 `poll` 调用时都生成一个新的线程。这么做问题不大,但是当 `poll` 调用较多时会出现明显的性能问题!一个解决方法就是记录你是否已经生成了一个线程,然后只有在没有生成时才去创建一个新的线程。但是一旦这么做,就必须确保线程的 `Waker` 在后续 `poll` 调用中被正确更新,否则你无法唤醒最近的 `Waker` !
|
|
|
|
我们之前的 `Delay` 实现中,会在每一次 `poll` 调用时都生成一个新的线程。这么做问题不大,但是当 `poll` 调用较多时会出现明显的性能问题!一个解决方法就是记录你是否已经生成了一个线程,然后只有在没有生成时才去创建一个新的线程。但是一旦这么做,就必须确保线程的 `Waker` 在后续 `poll` 调用中被正确更新,否则你无法唤醒最近的 `Waker` !
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
这一段大家可能会看得云里雾里的,没办法,原文就饶老绕去,好在终于可以看代码了。。我们可以通过代码来解决疑惑:
|
|
|
|
这一段大家可能会看得云里雾里的,没办法,原文就饶来绕去,好在终于可以看代码了。。我们可以通过代码来解决疑惑:
|
|
|
|
```rust
|
|
|
|
```rust
|
|
|
|
use std::future::Future;
|
|
|
|
use std::future::Future;
|
|
|
|
use std::pin::Pin;
|
|
|
|
use std::pin::Pin;
|
|
|
@ -525,7 +525,8 @@ impl Future for Delay {
|
|
|
|
let mut waker = waker.lock().unwrap();
|
|
|
|
let mut waker = waker.lock().unwrap();
|
|
|
|
|
|
|
|
|
|
|
|
// 检查之前存储的 `waker` 是否跟当前任务的 `waker` 相匹配.
|
|
|
|
// 检查之前存储的 `waker` 是否跟当前任务的 `waker` 相匹配.
|
|
|
|
// 这是必要的,原因是 `Delay Future` 的实例可能会在两次 `poll` 之间被转移到另一个任务中,然后 // 存储中的 waker 被该任务进行了更新。
|
|
|
|
// 这是必要的,原因是 `Delay Future` 的实例可能会在两次 `poll` 之间被转移到另一个任务中,然后
|
|
|
|
|
|
|
|
// 存储的 waker 被该任务进行了更新。
|
|
|
|
// 这种情况一旦发生,`Context` 包含的 `waker` 将不同于存储的 `waker`。
|
|
|
|
// 这种情况一旦发生,`Context` 包含的 `waker` 将不同于存储的 `waker`。
|
|
|
|
// 因此我们必须对存储的 `waker` 进行更新
|
|
|
|
// 因此我们必须对存储的 `waker` 进行更新
|
|
|
|
if !waker.will_wake(cx.waker()) {
|
|
|
|
if !waker.will_wake(cx.waker()) {
|
|
|
@ -550,7 +551,7 @@ impl Future for Delay {
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// 一旦 waker 被存储且计时器线程已经开始,我们就需要检查 `delay` 是否已尽完成
|
|
|
|
// 一旦 waker 被存储且计时器线程已经开始,我们就需要检查 `delay` 是否已经完成
|
|
|
|
// 若计时已完成,则当前 Future 就可以完成并返回 `Poll::Ready`
|
|
|
|
// 若计时已完成,则当前 Future 就可以完成并返回 `Poll::Ready`
|
|
|
|
if Instant::now() >= self.when {
|
|
|
|
if Instant::now() >= self.when {
|
|
|
|
Poll::Ready(())
|
|
|
|
Poll::Ready(())
|
|
|
@ -603,7 +604,7 @@ async fn delay(dur: Duration) {
|
|
|
|
## 总结
|
|
|
|
## 总结
|
|
|
|
在看完这么长的文章后,我们来总结下,否则大家可能还会遗忘:
|
|
|
|
在看完这么长的文章后,我们来总结下,否则大家可能还会遗忘:
|
|
|
|
|
|
|
|
|
|
|
|
- 在 Rust 中,`ascyn` 是惰性的,直到执行器 `poll` 它们时,才会开始执行
|
|
|
|
- 在 Rust 中,`async` 是惰性的,直到执行器 `poll` 它们时,才会开始执行
|
|
|
|
- `Waker` 是 `Future` 被执行的关键,它可以链接起 `Future` 任务和执行器
|
|
|
|
- `Waker` 是 `Future` 被执行的关键,它可以链接起 `Future` 任务和执行器
|
|
|
|
- 当资源没有准备时,会返回一个 `Poll::Pending`
|
|
|
|
- 当资源没有准备时,会返回一个 `Poll::Pending`
|
|
|
|
- 当资源准备好时,会通过 `waker.wake` 发出通知
|
|
|
|
- 当资源准备好时,会通过 `waker.wake` 发出通知
|
|
|
|