diff --git a/src/tokio/async.md b/src/tokio/async.md index d1fb2c12..88dd5ee8 100644 --- a/src/tokio/async.md +++ b/src/tokio/async.md @@ -1,10 +1,13 @@ # 深入 Tokio 背后的异步原理 -在经过多个章节的深入学习后,Tokio 对我们来说不再是一座隐于云雾中的高山,它其实蛮简单好用的,甚至还有一丝丝的可爱!? + +在经过多个章节的深入学习后,Tokio 对我们来说不再是一座隐于云雾中的高山,它其实蛮简单好用的,甚至还有一丝丝的可爱!? 但从现在开始,如果想要进一步的深入 Tokio ,首先需要深入理解 `async` 的原理,其实我们在[之前的章节](https://course.rs/async/intro.html)已经深入学习过,这里结合 Tokio 再来回顾下。 ## Future + 先来回顾一下 `async fn` 异步函数 : + ```rust use tokio::net::TcpStream; @@ -18,6 +21,7 @@ async fn my_async_fn() { ``` 接着对它进行调用获取一个返回值,再在返回值上调用 `.await`: + ```rust #[tokio::main] async fn main() { @@ -37,6 +41,7 @@ async fn main() { `Future` 是一个实现了 [`std::future::Future`](https://doc.rust-lang.org/std/future/trait.Future.html) 特征的值,该值包含了一系列异步计算过程,而这个过程直到 `.await` 调用时才会被执行。 `std::future::Future` 的定义如下所示: + ```rust use std::pin::Pin; use std::task::{Context, Poll}; @@ -52,12 +57,13 @@ pub trait Future { 代码中有几个关键点: - [关联类型](https://course.rs/basic/trait/advance-trait.html#关联类型) `Output` 是 `Future` 执行完成后返回的值的类型 -- `Pin` 类型是在异步函数中进行借用的关键,在[这里]((https://course.rs/async/pin-unpin.html))有非常详细的介绍 +- `Pin` 类型是在异步函数中进行借用的关键,在[这里](<(https://course.rs/async/pin-unpin.html)>)有非常详细的介绍 -和其它语言不同,Rust中的 `Future` 不代表一个发生在后台的计算,而是 `Future` 就代表了计算本身,因此 +和其它语言不同,Rust 中的 `Future` 不代表一个发生在后台的计算,而是 `Future` 就代表了计算本身,因此 `Future` 的所有者有责任去推进该计算过程的执行,例如通过 `Future::poll` 函数。听上去好像还挺复杂?但是大家不必担心,因为这些都在 Tokio 中帮你自动完成了 :) #### 实现 Future + 下面来一起实现个五脏俱全的 `Future`,它将:1. 等待某个特定时间点的到来 2. 在标准输出打印文本 3. 生成一个字符串 ```rust @@ -106,9 +112,11 @@ async fn main() { 以上代码很清晰的解释了如何自定义一个 `Future`,并指定它如何通过 `poll` 一步一步执行,直到最终完成返回 "done" 字符串。 #### async fn 作为 Future + 大家有没有注意到,上面代码我们在 `main` 函数中初始化一个 `Future` 并使用 `.await` 对其进行调用执行,如果你是在 `fn main` 中这么做,是会报错的。 原因是 `.await` 只能用于 `async fn` 函数中,因此我们将 `main` 函数声明成 `async fn main` 同时使用 `#[tokio::main]` 进行了标注,此时 `async fn main` 生成的代码类似下面: + ```rust use std::future::Future; use std::pin::Pin; @@ -167,15 +175,18 @@ impl Future for MainFuture { 同时可以看到:当一个 `Future` 由其它 `Future` 组成时,调用外层 `Future` 的 `poll` 函数会同时调用一次内部 `Future` 的 `poll` 函数。 -## 执行器( Excecutor ) +## 执行器( Excecutor ) + `async fn` 返回 `Future` ,而后者需要通过被不断的 `poll` 才能往前推进状态,同时该 `Future` 还能包含其它 `Future` ,那么问题来了谁来负责调用最外层 `Future` 的 `poll` 函数? 回一下之前的内容,为了运行一个异步函数,我们必须使用 `tokio::spawn` 或 通过 `#[tokio::main]` 标注的 `async fn main` 函数。它们有一个非常重要的作用:将最外层 `Future` 提交给 Tokio 的执行器。该执行器负责调用 `poll` 函数,然后推动 `Future` 的执行,最终直至完成。 #### mini tokio -为了更好理解相关的内容,我们一起来实现一个迷你版本的 Tokio,完整的代码见[这里](https://github.com/tokio-rs/website/blob/master/tutorial-code/mini-tokio/src/main.rs)。 + +为了更好理解相关的内容,我们一起来实现一个迷你版本的 Tokio,完整的代码见[这里](https://github.com/tokio-rs/website/blob/master/tutorial-code/mini-tokio/src/main.rs)。 先来看一段基础代码: + ```rust use std::collections::VecDeque; use std::future::Future; @@ -210,7 +221,7 @@ impl MiniTokio { tasks: VecDeque::new(), } } - + /// 生成一个 Future并放入 mini-tokio 实例的任务队列中 fn spawn(&mut self, future: F) where @@ -218,11 +229,11 @@ impl MiniTokio { { self.tasks.push_back(Box::pin(future)); } - + fn run(&mut self) { let waker = task::noop_waker(); let mut cx = Context::from_waker(&waker); - + while let Some(mut task) = self.tasks.pop_front() { if task.as_mut().poll(&mut cx).is_pending() { self.tasks.push_back(task); @@ -232,16 +243,18 @@ impl MiniTokio { } ``` -以上代码运行了一个 `async` 语句块 `mini_tokio.spawn(async {...})`, 还创建了一个 `Delay` 实例用于等待所需的时间。看上去相当不错,但这个实现有一个 **重大缺陷**:我们的执行器永远也不会休眠。执行器会持续的循环遍历所有的 `Future` ,然后不停的 `poll` 它们,但是事实上,大多数 `poll` 都是没有用的,因为此时 `Future` 并没有准备好,因此会继续返回 `Poll::Pending` ,最终这个循环遍历会让你的CPU疲于奔命,真打工人! +以上代码运行了一个 `async` 语句块 `mini_tokio.spawn(async {...})`, 还创建了一个 `Delay` 实例用于等待所需的时间。看上去相当不错,但这个实现有一个 **重大缺陷**:我们的执行器永远也不会休眠。执行器会持续的循环遍历所有的 `Future` ,然后不停的 `poll` 它们,但是事实上,大多数 `poll` 都是没有用的,因为此时 `Future` 并没有准备好,因此会继续返回 `Poll::Pending` ,最终这个循环遍历会让你的 CPU 疲于奔命,真打工人! 鉴于此,我们的 mini-tokio 只应该在 `Future` 准备好可以进一步运行后,才去 `poll` 它,例如该 `Future` 之前阻塞等待的**资源**已经准备好并可以被使用了,就可以对其进行 `poll`。再比如,如果一个 `Future` 任务在阻塞等待从 TCP socket 中读取数据,那我们只想在 `socket` 中有数据可以读取后才去 `poll` 它,而不是没事就 `poll` 着玩。 回到在上面的代码中,mini-tokio 只应该当任务的延迟时间到了后,才去 `poll` 它。 为了实现这个功能,我们需要 `通知 -> 运行` 机制:当任务可以进一步被推进运行时,它会主动通知执行器,然后执行器再来 `poll`。 ## Waker + 一切的答案都在 `Waker` 中,资源可以用它来通知正在等待的任务:该资源已经准备好,可以继续运行了。 再来看下 `Future::poll` 的定义: + ```rust fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll; @@ -252,7 +265,9 @@ fn poll(self: Pin<&mut Self>, cx: &mut Context) 准确来说,当 `Future` 阻塞等待的资源已经准备好时(例如 socket 中有了可读取的数据),该资源可以调用 `wake()` 方法,来通知执行器可以继续调用该 `Future` 的 `poll` 函数来推进任务的执行。 #### 发送 wake 通知 + 现在,为 `Delay` 添加下 `Waker` 支持: + ```rust use std::future::Future; use std::pin::Pin; @@ -300,10 +315,11 @@ impl Future for Delay { 当然,现在的实现还较为粗糙,等会我们会来进一步优化,在此之前,先来看看如何监听这个 `wake` 通知。 > 当 Future 会返回 `Poll::Pending` 时,一定要确保 `wake` 能被正常调用,否则会导致任务永远被挂起,再也不会被执行器 `poll`。 -> +> > **忘记在返回 `Poll::Pending` 时调用 `wake` 是很多难以发现 bug 的潜在源头!** 再回忆下最早实现的 `Delay` 代码: + ```rust impl Future for Delay { type Output = &'static str; @@ -330,6 +346,7 @@ impl Future for Delay { 由此可见,这种通知的控制权是在你手里的,甚至可以像上面代码这样,还没准备好资源,就直接进行 `wake` 通知,但是总归意义不大,而且浪费了 CPU,因为这种 `执行 -> 立即通知再调度 -> 执行` 的方式会造成一个非常繁忙的循环。 #### 处理 wake 通知 + 下面,让我们更新 mint-tokio 服务,让它能接受 wake 通知:当 `waker.wake()` 被调用后,相关联的任务会被放入执行器的队列中,然后等待执行器的调用执行。 为了实现这一点,我们将使用消息通道来排队存储这些被唤醒并等待调度的任务。有一点需要注意,从消息通道接收消息的线程(执行器所在的线程)和发送消息的线程(唤醒任务时所在的线程)可能是不同的,因此消息( `Waker` )必须要实现 `Send`和 `Sync`,才能跨线程使用。 @@ -337,11 +354,13 @@ impl Future for Delay { > 关于 `Send` 和 `Sync` 的具体讲解见[这里](https://course.rs/advance/concurrency-with-threads/send-sync.html) 基于以上理由,我们选择使用来自于 `crossbeam` 的消息通道,因为标准库中的消息通道不是 `Sync` 的。在 `Cargo.toml` 中添加以下依赖: + ```toml crossbeam = "0.8" ``` 再来更新下 `MiniTokio` 结构体: + ```rust use crossbeam::channel; use std::sync::Arc; @@ -359,6 +378,7 @@ struct Task { `Waker` 实现了 `Sync` 特征,同时还可以被克隆,当 `wake` 被调用时,任务就会被调度执行。 为了实现上述的目的,我们引入了消息通道,当 `waker.wake()` 函数被调用时,任务会被发送到该消息通道中: + ```rust use std::sync::{Arc, Mutex}; @@ -379,11 +399,13 @@ impl Task { 接下来,我们需要让 `std::task::Waker` 能准确的找到所需的调度函数 关联起来,对此标准库中提供了一个底层的 API [`std::task::RawWakerVTable`](https://doc.rust-lang.org/std/task/struct.RawWakerVTable.html) 可以用于手动的访问 `vtable`,这种实现提供了最大的灵活性,但是需要大量 `unsafe` 的代码。 因此我们选择更加高级的实现:由 `futures` 包提供的 [`ArcWake`](https://docs.rs/futures/0.3.19/futures/task/trait.ArcWake.html) 特征,只要简单实现该特征,就可以将我们的 `Task` 转变成一个 `waker`。在 `Cargo.toml` 中添加以下包: + ```toml futures = "0.3" ``` 然后为我们的任务 `Task` 实现 `ArcWake`: + ```rust use futures::task::{self, ArcWake}; use std::sync::Arc; @@ -395,6 +417,7 @@ impl ArcWake for Task { ``` 当之前的计时器线程调用 `waker.wake()` 时,所在的任务会被推入到消息通道中。因此接下来,我们需要实现接受端的功能,然后 `MiniTokio::run()` 函数中执行该任务: + ```rust impl MiniTokio { // 从消息通道中接收任务,然后通过 poll 来执行 @@ -460,12 +483,14 @@ impl Task { > 注意,Task::poll 和执行器调用的 poll 是完全不同的,大家别搞混了 - ## 一些遗留问题 + 至此,我们的程序已经差不多完成,还剩几个遗留问题需要解决下。 #### 在异步函数中生成异步任务 + 之前实现 `Delay Future` 时,我们提到有几个问题需要解决。Rust 的异步模型允许一个 Future 在执行过程中可以跨任务迁移: + ```rust use futures::future::poll_fn; use std::future::Future; @@ -489,7 +514,6 @@ async fn main() { } ``` - 首先,`poll_fn` 函数使用闭包创建了一个 `Future`,其次,上面代码还创建一个 `Delay` 实例,然后在闭包中,对其进行了一次 `poll` ,接着再将该 `Delay` 实例发送到一个新的任务,在此任务中使用 `.await` 进行了执行。 在例子中,`Delay:poll` 被调用了不止一次,且使用了不同的 `Waker` 实例,在这种场景下,你必须确保调用最近一次 `poll` 函数中的 `Waker` 参数中的`wake`方法。也就是调用最内层 `poll` 函数参数( `Waker` )上的 `wake` 方法。 @@ -498,8 +522,8 @@ async fn main() { 我们之前的 `Delay` 实现中,会在每一次 `poll` 调用时都生成一个新的线程。这么做问题不大,但是当 `poll` 调用较多时会出现明显的性能问题!一个解决方法就是记录你是否已经生成了一个线程,然后只有在没有生成时才去创建一个新的线程。但是一旦这么做,就必须确保线程的 `Waker` 在后续 `poll` 调用中被正确更新,否则你无法唤醒最近的 `Waker` ! - 这一段大家可能会看得云里雾里的,没办法,原文就饶来绕去,好在终于可以看代码了。。我们可以通过代码来解决疑惑: + ```rust use std::future::Future; use std::pin::Pin; @@ -560,7 +584,7 @@ impl Future for Delay { // // `Future` 特征要求当 `Pending` 被返回时,那我们要确保当资源准备好时,必须调用 `waker` 以通/// 知执行器。 在我们的例子中,会通过生成的计时线程来保证 // - // 如果忘记调用 waker, 那等待我们的将是深渊:该任务将被永远的挂起,无法再执行 + // 如果忘记调用 waker, 那等待我们的将是深渊:该任务将被永远的挂起,无法再执行 Poll::Pending } } @@ -570,9 +594,11 @@ impl Future for Delay { 这着实有些复杂(原文。。),但是简单来看就是:在每次 `poll` 调用时,都会检查 `Context` 中提供的 `waker` 和我们之前记录的 `waker` 是否匹配。若匹配,就什么都不用做,若不匹配,那之前存储的就必须进行更新。 #### Notify -我们之前证明了如何用手动编写的 `waker` 来实现 `Delay Future`。 `Waker` 是Rust异步编程的基石,因此绝大多数时候,我们并不需要直接去使用它。例如,在 `Delay` 的例子中, 可以使用 [`tokio::sync::Notify`](https://docs.rs/tokio/1.16.0/tokio/sync/struct.Notify.html) 去实现。 + +我们之前证明了如何用手动编写的 `waker` 来实现 `Delay Future`。 `Waker` 是 Rust 异步编程的基石,因此绝大多数时候,我们并不需要直接去使用它。例如,在 `Delay` 的例子中, 可以使用 [`tokio::sync::Notify`](https://docs.rs/tokio/1.16.0/tokio/sync/struct.Notify.html) 去实现。 该 `Notify` 提供了一个基础的任务通知机制,它会处理这些 `waker` 的细节,包括确保两次 `waker` 的匹配: + ```rust use tokio::sync::Notify; use std::sync::Arc; @@ -602,6 +628,7 @@ async fn delay(dur: Duration) { 当使用 `Notify` 后,我们就可以轻松的实现如上的 `delay` 函数。 ## 总结 + 在看完这么长的文章后,我们来总结下,否则大家可能还会遗忘: - 在 Rust 中,`async` 是惰性的,直到执行器 `poll` 它们时,才会开始执行 diff --git a/src/tokio/bridging-with-sync.md b/src/tokio/bridging-with-sync.md index b86e34b9..1cf34c89 100644 --- a/src/tokio/bridging-with-sync.md +++ b/src/tokio/bridging-with-sync.md @@ -1,5 +1,7 @@ # 异步跟同步共存 -一些异步程序例如 tokio指南 章节中的绝大多数例子,它们整个程序都是异步的,包括程序入口 `main` 函数: + +一些异步程序例如 tokio 指南 章节中的绝大多数例子,它们整个程序都是异步的,包括程序入口 `main` 函数: + ```rust #[tokio::main] async fn main() { @@ -14,9 +16,11 @@ async fn main() { 因此本章节的目标很纯粹:如何在同步代码中使用一小部分异步代码。 ## `#[tokio::main]` 的展开 -在 Rust 中, `main` 函数不能是异步的,有同学肯定不愿意了,我们在之前章节..不对,就在开头,你还用到了 `async fn main` 的声明方式,怎么就不能异步了呢? + +在 Rust 中, `main` 函数不能是异步的,有同学肯定不愿意了,我们在之前章节..不对,就在开头,你还用到了 `async fn main` 的声明方式,怎么就不能异步了呢? 其实,`#[tokio::main]` 该宏仅仅是提供语法糖,目的是让大家可以更简单、更一致的去写异步代码,它会将你写下的`async fn main` 函数替换为: + ```rust fn main() { tokio::runtime::Builder::new_multi_thread() @@ -31,11 +35,12 @@ fn main() { 注意到上面的 `block_on` 方法了嘛?在我们自己的同步代码中,可以使用它开启一个 `async/await` 世界。 -## mini-redis的同步接口 -在下面,我们将一起构建一个同步的 `mini-redis` ,为了实现这一点,需要将 `Runtime` 对象存储起来,然后利用上面提到的 `block_on` 方法。 +## mini-redis 的同步接口 +在下面,我们将一起构建一个同步的 `mini-redis` ,为了实现这一点,需要将 `Runtime` 对象存储起来,然后利用上面提到的 `block_on` 方法。 首先,创建一个文件 `src/blocking_client.rs`,然后使用下面代码将异步的 `Client` 结构体包裹起来: + ```rust use tokio::net::ToSocketAddrs; use tokio::runtime::Runtime; @@ -73,7 +78,6 @@ pub fn connect(addr: T) -> crate::Result { 在构建 `Runtime` 的过程中还有一个 [`enable_all`](https://docs.rs/tokio/1.16.1/tokio/runtime/struct.Builder.html#method.enable_all) 方法调用,它可以开启 `Tokio` 运行时提供的 IO 和定时器服务。 - > 由于 `current_thread` 运行时并不生成新的线程,只是运行在已有的主线程上,因此只有当 `block_on` 被调用后,该运行时才能执行相应的操作。一旦 `block_on` 返回,那运行时上所有生成的任务将再次冻结,直到 `block_on` 的再次调用。 > > 如果这种模式不符合使用场景的需求,那大家还是需要用 `multi_thread` 运行时来代替。事实上,在 tokio 之前的章节中,我们默认使用的就是 `multi_thread` 模式。 @@ -109,6 +113,7 @@ impl BlockingClient { 这代码看上去挺长,实际上很简单,通过 `block_on` 将异步形式的 `Client` 的方法变成同步调用的形式。例如 `BlockingClient` 的 `get` 方法实际上是对内部的异步 `get` 方法的同步调用。 与上面的平平无奇相比,下面的代码将更有趣,因为它将 `Client` 转变成一个 `Subscriber` 对象: + ```rust /// 下面的客户端可以进入 pub/sub (发布/订阅) 模式 /// @@ -155,10 +160,13 @@ impl BlockingSubscriber { 由上可知,`subscribe` 方法会使用运行时将一个异步的 `Client` 转变成一个异步的 `Subscriber`,此外,`Subscriber` 结构体有一个非异步的方法 `get_subscribed`,对于这种方法,只需直接调用即可,而无需使用运行时。 ## 其它方法 + 上面介绍的是最简单的方法,但是,如果只有这一种, tokio 也不会如此大名鼎鼎。 #### runtime.spawn + 可以通过 `Runtime` 的 `spawn` 方法来创建一个基于该运行时的后台任务: + ```rust use tokio::runtime::Builder; use tokio::time::{sleep, Duration}; @@ -197,6 +205,7 @@ async fn my_bg_task(i: u64) { ``` 运行该程序,输出如下: + ```console Task 0 sleeping for 1000 ms. Task 1 sleeping for 950 ms. @@ -221,7 +230,7 @@ Task 1 stopping. Task 0 stopping. ``` -在此例中,我们生成了10个后台任务在运行时中运行,然后等待它们的完成。作为一个例子,想象一下在图形渲染应用( GUI )中,有时候需要通过网络访问远程服务来获取一些数据,那上面的这种模式就非常适合,因为这些网络访问比较耗时,而且不会影响图形的主体渲染,因此可以在主线程中渲染图形,然后使用其它线程来运行 Tokio 的运行时,并通过该运行时使用异步的方式完成网络访问,最后将这些网络访问的结果发送到 GUI 进行数据渲染,例如一个进度条。 +在此例中,我们生成了 10 个后台任务在运行时中运行,然后等待它们的完成。作为一个例子,想象一下在图形渲染应用( GUI )中,有时候需要通过网络访问远程服务来获取一些数据,那上面的这种模式就非常适合,因为这些网络访问比较耗时,而且不会影响图形的主体渲染,因此可以在主线程中渲染图形,然后使用其它线程来运行 Tokio 的运行时,并通过该运行时使用异步的方式完成网络访问,最后将这些网络访问的结果发送到 GUI 进行数据渲染,例如一个进度条。 还有一点很重要,在本例子中只能使用 `multi_thread` 运行时。如果我们使用了 `current_thread`,你会发现主线程的耗时任务会在后台任务开始之前就完成了。因为在 `current_thread` 模式下,生成的任务只会在 `block_on` 期间才执行。 @@ -231,7 +240,9 @@ Task 0 stopping. - 通过共享变量的方式,例如 `Mutex`,这种方式非常适合实现 GUI 的进度条: GUI 在每个渲染帧读取该变量即可。 #### 发送消息 + 在同步代码中使用异步的另一个方法就是生成一个运行时,然后使用消息传递的方式跟它进行交互。这个方法虽然更啰嗦一些,但是相对于之前的两种方法更加灵活: + ```rust use tokio::runtime::Builder; use tokio::sync::mpsc; diff --git a/src/tokio/channels.md b/src/tokio/channels.md index f246ddff..13b65c66 100644 --- a/src/tokio/channels.md +++ b/src/tokio/channels.md @@ -1,18 +1,22 @@ # 消息传递 + 迄今为止,你已经学了不少关于 Tokio 的并发编程的内容,是时候见识下真正的挑战了,接下来,我们一起来实现下客户端这块儿的功能。 首先,将之前实现的 `src/main.rs `文件中的[服务器端代码](https://github.com/tokio-rs/website/blob/master/tutorial-code/shared-state/src/main.rs)放入到一个 bin 文件中,等下可以直接通过该文件来运行我们的服务器: + ```console mkdir src/bin mv src/main.rs src/bin/server.rs ``` 接着创建一个新的 bin 文件,用于包含我们即将实现的客户端代码: + ```console touch src/bin/client.rs ``` 由于不再使用 `main.rs` 作为程序入口,我们需要使用以下命令来运行指定的 bin 文件: + ```rust cargo run --bin server ``` @@ -22,7 +26,9 @@ cargo run --bin server 万事俱备,只欠代码,一起来看看客户端该如何实现。 ## 错误的实现 + 如果想要同时运行两个 redis 命令,我们可能会为每一个命令生成一个任务,例如: + ```rust use mini_redis::client; @@ -55,13 +61,15 @@ async fn main() { 这个不行,那个也不行,是不是没有办法解决了?还记得我们上一章节提到过几次的消息传递,但是一直没有看到它的庐山真面目吗?现在可以来看看了。 ## 消息传递 + 之前章节我们提到可以创建一个专门的任务 `C1` (消费者 Consumer) 和通过消息传递来管理共享的资源,这里的共享资源就是 `client` 。若任务 `P1` (生产者 Producer) 想要发出 Redis 请求,首先需要发送信息给 `C1`,然后 `C1` 会发出请求给服务器,在获取到结果后,再将结果返回给 `P1`。 在这种模式下,只需要建立一条连接,然后由一个统一的任务来管理 `client` 和该连接,这样之前的 `get` 和 `set` 请求也将不存在资源共享的问题。 同时,`P1` 和 `C1` 进行通信的消息通道是有缓冲的,当大量的消息发送给 `C1` 时,首先会放入消息通道的缓冲区中,当 `C1` 处理完一条消息后,再从该缓冲区中取出下一条消息进行处理,这种方式跟消息队列( mq ) 非常类似,可以实现更高的吞吐。而且这种方式还有利于实现连接池,例如不止一个 `P` 和 `C` 时,多个 `P` 可以往消息通道中发送消息,同时多个 `C`,其中每个 `C` 都维护一条连接,并从消息通道获取消息。 -## Tokio的消息通道( channel ) +## Tokio 的消息通道( channel ) + Tokio 提供了多种消息通道,可以满足不同场景的需求: - [`mpsc`](https://docs.rs/tokio/1.15.0/tokio/sync/mpsc/index.html), 多生产者,单消费者模式 @@ -76,7 +84,9 @@ Tokio 提供了多种消息通道,可以满足不同场景的需求: 在下面的代码中,我们将使用 `mpsc` 和 `oneshot`, 本章节完整的代码见[这里](https://github.com/tokio-rs/website/blob/master/tutorial-code/channels/src/main.rs)。 ## 定义消息类型 + 在大多数场景中使用消息传递时,都是多个发送者向一个任务发送消息,该任务在处理完后,需要将响应内容返回给相应的发送者。例如我们的例子中,任务需要将 `GET` 和 `SET` 命令处理的结果返回。首先,我们需要定一个 `Command` 枚举用于代表命令: + ```rust use bytes::Bytes; @@ -93,7 +103,9 @@ enum Command { ``` ## 创建消息通道 + 在 `src/bin/client.rs` 的 `main` 函数中,创建一个 `mpsc` 消息通道: + ```rust use tokio::sync::mpsc; @@ -139,7 +151,9 @@ async fn main() { 在我们的例子中,接收者是在管理 redis 连接的任务中,当该任务发现所有发送者都关闭时,它知道它的使命可以完成了,因此它会关闭 redis 连接。 ## 生成管理任务 + 下面,我们来一起创建一个管理任务,它会管理 redis 的连接,当然,首先需要创建一条到 redis 的连接: + ```rust use mini_redis::client; // 将消息通道接收者 rx 的所有权转移到管理任务中 @@ -167,6 +181,7 @@ let manager = tokio::spawn(async move { 如上所示,当从消息通道接收到一个命令时,该管理任务会将此命令通过 redis 连接发送到服务器。 现在,让两个任务发送命令到消息通道,而不是像最开始报错的那样,直接发送命令到各自的 redis 连接: + ```rust // 由于有两个任务,因此我们需要两个发送者 let tx2 = tx.clone(); @@ -191,6 +206,7 @@ let t2 = tokio::spawn(async move { ``` 在 `main` 函数的末尾,我们让 3 个任务,按照需要的顺序开始运行: + ```rust t1.await.unwrap(); t2.await.unwrap(); @@ -198,6 +214,7 @@ manager.await.unwrap(); ``` ## 接收响应消息 + 最后一步,就是让发出命令的任务从管理任务那里获取命令执行的结果。为了完成这个目标,我们将使用 `oneshot` 消息通道,因为它针对一发一收的使用类型做过特别优化,且特别适用于此时的场景:接收一条从管理任务发送的结果消息。 ```rust @@ -209,6 +226,7 @@ let (tx, rx) = oneshot::channel(); 使用方式跟 `mpsc` 很像,但是它并没有缓存长度,因为只能发送一条,接收一条,还有一点不同:你无法对返回的两个句柄进行 `clone`。 为了让管理任务将结果准确的返回到发送者手中,这个管道的发送端必须要随着命令一起发送, 然后发出命令的任务保留管道的发送端。一个比较好的实现就是将管道的发送端放入 `Command` 的数据结构中,同时使用一个别名来代表该发送端: + ```rust use tokio::sync::oneshot; use bytes::Bytes; @@ -232,6 +250,7 @@ type Responder = oneshot::Sender>; ``` 下面,更新发送命令的代码: + ```rust let t1 = tokio::spawn(async move { let (resp_tx, resp_rx) = oneshot::channel(); @@ -266,6 +285,7 @@ let t2 = tokio::spawn(async move { ``` 最后,更新管理任务: + ```rust while let Some(cmd) = rx.recv().await { match cmd { @@ -290,9 +310,11 @@ while let Some(cmd) = rx.recv().await { 本章的完整代码见[这里](https://github.com/tokio-rs/website/blob/master/tutorial-code/channels/src/main.rs)。 ## 对消息通道进行限制 + 无论何时使用消息通道,我们都需要对缓存队列的长度进行限制,这样系统才能优雅的处理各种负载状况。如果不限制,假设接收端无法及时处理消息,那消息就会迅速堆积,最终可能会导致内存消耗殆尽,就算内存没有消耗完,也可能会导致整体性能的大幅下降。 Tokio 在设计时就考虑了这种状况,例如 `async` 操作在 Tokio 中是惰性的: + ```rust loop { async_op(); diff --git a/src/tokio/frame.md b/src/tokio/frame.md index 2cf3c099..d6e7b50b 100644 --- a/src/tokio/frame.md +++ b/src/tokio/frame.md @@ -1,5 +1,7 @@ # 解析数据帧 + 现在,鉴于大家已经掌握了 Tokio 的基本 I/O 用法,我们可以开始实现 `mini-redis` 的帧 `frame`。通过帧可以将字节流转换成帧组成的流。每个帧就是一个数据单元,例如客户端发送的一次请求就是一个帧。 + ```rust use bytes::Bytes; @@ -14,6 +16,7 @@ enum Frame { ``` 可以看到帧除了数据之外,并不具备任何语义。命令解析和实现会在更高的层次进行(相比帧解析层)。我们再来通过 HTTP 的帧来帮大家加深下相关的理解: + ```rust enum HttpFrame { RequestHead { @@ -34,6 +37,7 @@ enum HttpFrame { ``` 为了实现 `mini-redis` 的帧,我们需要一个 `Connection` 结构体,里面包含了一个 `TcpStream` 以及对帧进行读写的方法: + ```rust use tokio::net::TcpStream; use mini_redis::{Frame, Result}; @@ -45,7 +49,7 @@ struct Connection { impl Connection { /// 从连接读取一个帧 - /// + /// /// 如果遇到EOF,则返回 None pub async fn read_frame(&mut self) -> Result> @@ -62,9 +66,10 @@ impl Connection { } ``` -关于Redis协议的说明,可以看看[官方文档](https://redis.io/topics/protocol),`Connection` 代码的完整实现见[这里](https://github.com/tokio-rs/mini-redis/blob/tutorial/src/connection.rs). +关于 Redis 协议的说明,可以看看[官方文档](https://redis.io/topics/protocol),`Connection` 代码的完整实现见[这里](https://github.com/tokio-rs/mini-redis/blob/tutorial/src/connection.rs). ## 缓冲读取(Buffered Reads) + `read_frame` 方法会等到一个完整的帧都读取完毕后才返回,与之相比,它底层调用的`TcpStream::read` 只会返回任意多的数据(填满传入的缓冲区 buffer ),它可能返回帧的一部分、一个帧、多个帧,总之这种读取行为是不确定的。 当 `read_frame` 的底层调用 `TcpStream::read` 读取到部分帧时,会将数据先缓冲起来,接着继续等待并读取数据。如果读到多个帧,那第一个帧会被返回,然后剩下的数据依然被缓冲起来,等待下一次 `read_frame` 被调用。 @@ -94,6 +99,7 @@ impl Connection { ``` 接下来,实现 `read_frame` 方法: + ```rust use tokio::io::AsyncReadExt; use bytes::Buf; @@ -130,9 +136,11 @@ pub async fn read_frame(&mut self) `read_frame` 内部使用循环的方式读取数据,直到一个完整的帧被读取到时,才会返回。当然,当远程的对端关闭了连接后,也会返回。 #### `Buf` 特征 + 在上面的 `read_frame` 方法中,我们使用了 `read_buf` 来读取 socket 中的数据,该方法的参数是来自 [`bytes`](https://docs.rs/bytes/) 包的 `BufMut`。 可以先来考虑下该如何使用 `read()` 和 `Vec` 来实现同样的功能 : + ```rust use tokio::net::TcpStream; @@ -155,6 +163,7 @@ impl Connection { ``` 下面是相应的 `read_frame` 方法: + ```rust use mini_redis::{Frame, Result}; @@ -203,6 +212,7 @@ pub async fn read_frame(&mut self) 与 `Vec` 相反, `BytesMut` 和 `BufMut` 就没有这个问题,它们无需被初始化,而且 `BytesMut` 还会阻止我们读取未初始化的内存。 ## 帧解析 + 在理解了该如何读取数据后, 再来看看该如何通过两个部分解析出一个帧: - 确保有一个完整的帧已经被写入了缓冲区,找到该帧的最后一个字节所在的位置 @@ -251,18 +261,19 @@ fn parse_frame(&mut self) 值得一提的是, `Frame::check` 使用了 `Buf` 的字节迭代风格的 API。例如,为了解析一个帧,首先需要检查它的第一个字节,该字节用于说明帧的类型。这种首字节检查是通过 `Buf::get_u8` 函数完成的,该函数会获取游标所在位置的字节,然后将游标位置向右移动一个字节。 ## 缓冲写入(Buffered writes) + 关于帧操作的另一个 API 是 `write_frame(frame)` 函数,它会将一个完整的帧写入到 socket 中。 每一次写入,都会触发一次或数次系统调用,当程序中有大量的连接和写入时,系统调用的开销将变得非常高昂,具体可以看看 SyllaDB 团队写过的一篇[性能调优文章](https://www.scylladb.com/2022/01/12/async-rust-in-practice-performance-pitfalls-profiling/)。 为了降低系统调用的次数,我们需要使用一个写入缓冲区,当写入一个帧时,首先会写入该缓冲区,然后等缓冲区数据足够多时,再集中将其中的数据写入到 socket 中,这样就将多次系统调用优化减少到一次。 -还有,缓冲区也不总是能提升性能。 例如,考虑一个 `bulk` 帧(多个帧放在一起组成一个bulk,通过批量发送提升效率),该帧的特点就是:由于由多个帧组合而成,因此帧体数据可能会很大。所以我们不能将其帧体数据写入到缓冲区中,因为数据较大时,先写入缓冲区再写入 socket 会有较大的性能开销(实际上缓冲区就是为了批量写入,既然 bulk 已经是批量了,因此不使用缓冲区也很正常)。 - +还有,缓冲区也不总是能提升性能。 例如,考虑一个 `bulk` 帧(多个帧放在一起组成一个 bulk,通过批量发送提升效率),该帧的特点就是:由于由多个帧组合而成,因此帧体数据可能会很大。所以我们不能将其帧体数据写入到缓冲区中,因为数据较大时,先写入缓冲区再写入 socket 会有较大的性能开销(实际上缓冲区就是为了批量写入,既然 bulk 已经是批量了,因此不使用缓冲区也很正常)。 为了实现缓冲写,我们将使用 [`BufWriter`](https://docs.rs/tokio/1/tokio/io/struct.BufWriter.html) 结构体。该结构体实现了 `AsyncWrite` 特征,当 `write` 方法被调用时,不会直接写入到 socket 中,而是先写入到缓冲区中。当缓冲区被填满时,其中的内容会自动刷到(写入到)内部的 socket 中,然后再将缓冲区清空。当然,其中还存在某些优化,通过这些优化可以绕过缓冲区直接访问 socket。 由于篇幅有限,我们不会实现完整的 `write_frame` 函数,想要看完整代码可以访问[这里](https://github.com/tokio-rs/mini-redis/blob/tutorial/src/connection.rs#L159-L184)。 首先,更新下 `Connection` 的结构体: + ```rust use tokio::io::BufWriter; use tokio::net::TcpStream; @@ -284,6 +295,7 @@ impl Connection { ``` 接着来实现 `write_frame` 函数: + ```rust use tokio::io::{self, AsyncWriteExt}; use mini_redis::Frame; @@ -334,4 +346,5 @@ async fn write_frame(&mut self, frame: &Frame) 在函数结束前,我们还额外的调用了一次 `self.stream.flush().await`,原因是缓冲区可能还存在数据,因此需要手动刷一次数据:`flush` 的调用会将缓冲区中剩余的数据立刻写入到 socket 中。 -当然,当帧比较小的时候,每写一次帧就 flush 一次的模式性能开销会比较大,此时我们可以选择在 `Connection` 中实现 `flush` 函数,然后将等帧积累多个后,再一次性在 `Connection` 中进行 flush。当然,对于我们的例子来说,简洁性是非常重要的,因此选了将 `flush` 放入到 `write_frame` 中。 \ No newline at end of file +当然,当帧比较小的时候,每写一次帧就 flush 一次的模式性能开销会比较大,此时我们可以选择在 `Connection` 中实现 `flush` 函数,然后将等帧积累多个后,再一次性在 `Connection` 中进行 flush。当然,对于我们的例子来说,简洁性是非常重要的,因此选了将 `flush` 放入到 `write_frame` 中。 + diff --git a/src/tokio/getting-startted.md b/src/tokio/getting-startted.md index 5a998d73..d2459f01 100644 --- a/src/tokio/getting-startted.md +++ b/src/tokio/getting-startted.md @@ -1,24 +1,30 @@ -# tokio初印象 -又到了喜闻乐见的初印象环节,这个环节决定了你心中的那24盏灯最终是全绿还是全灭。 +# tokio 初印象 + +又到了喜闻乐见的初印象环节,这个环节决定了你心中的那 24 盏灯最终是全绿还是全灭。 在本文中,我们将看看本专题的学习目标、`tokio`该怎么引入以及如何实现一个 `Hello Tokio` 项目,最终留灯还是灭灯的决定权留给各位看官。但我提前说好,如果你全灭了,但却找不到更好的,未来还是得回来真香 :P ## 专题目标 + 通过 API 学项目无疑是无聊的,因此我们采用一个与众不同的方式:边学边练,在本专题的最后你将拥有一个 `redis` 客户端和服务端,当然不会实现一个完整版本的 `redis` ,只会提供基本的功能和部分常用的命令。 #### mini-redis + `redis` 的项目源码可以在[这里访问](https://github.com/sunface/rust-course/tree/main/pratice/mini-redis),本项目是从[官方地址](https://github.com/tokio-rs/mini-redis) `fork` 而来,在未来会提供注释和文档汉化。 再次声明:该项目仅仅用于学习目的,因此它的文档注释非常全,但是它完全无法作为 `redis` 的替代品。 ## 环境配置 + 首先,我们假定你已经安装了 Rust 和相关的工具链,例如 `cargo`。其中 Rust 版本的最低要求是 `1.45.0`,建议使用最新版 `1.58`: + ```shell sunfei@sunface $ rustc --version rustc 1.58.0 (02072b482 2022-01-11) ``` 接下来,安装 `mini-redis` 的服务器端,它可以用来测试我们后面将要实现的 `redis` 客户端: + ```shell $ cargo install mini-redis ``` @@ -26,31 +32,37 @@ $ cargo install mini-redis > 如果下载失败,也可以通过[这个地址](https://github.com/sunface/rust-course/tree/main/pratice/mini-redis)下载源码,然后在本地通过 `cargo run`运行。 下载成功后,启动服务端: + ```shell $ mini-redis-server ``` 然后,再使用客户端测试下刚启动的服务端: + ```shell $ mini-redis-cli set foo 1 OK -$ mini-redis-cli get foo +$ mini-redis-cli get foo "1" ``` 不得不说,还挺好用的,先自我陶醉下 :) 此时,万事俱备,只欠东风,接下来是时候亮"箭"了:实现我们的 `Hello Tokio` 项目。 ## Hello Tokio -与简单无比的 `Hello World` 有所不同(简单?还记得本书开头时,湖畔边的那个多国语言版本的`你好,世界`嘛~~),`Hello Tokio` 它承载着"非常艰巨"的任务,那就是向刚启动的 `redis` 服务器写入一个 `key=hello, value=world` ,然后再读取出来,嗯,使用 `mini-redis` 客户端 :) + +与简单无比的 `Hello World` 有所不同(简单?还记得本书开头时,湖畔边的那个多国语言版本的`你好,世界`嘛~~),`Hello Tokio` 它承载着"非常艰巨"的任务,那就是向刚启动的 `redis` 服务器写入一个 `key=hello, value=world` ,然后再读取出来,嗯,使用 `mini-redis` 客户端 :) #### 分析未到,代码先行 + 在详细讲解之前,我们先来看看完整的代码,让大家有一个直观的印象。首先,创建一个新的 `Rust` 项目: + ```shell $ cargo new my-redis $ cd my-redis ``` 然后在 `Cargo.toml` 中添加相关的依赖: + ```toml [dependencies] tokio = { version = "1", features = ["full"] } @@ -58,6 +70,7 @@ mini-redis = "0.4" ``` 接下来,使用以下代码替换 `main.rs` 中的内容: + ```rust use mini_redis::{client, Result}; @@ -81,6 +94,7 @@ async fn main() -> Result<()> { 不知道你之前启动的 `mini-redis-server` 关闭没有,如果关了,记得重新启动下,否则我们的代码就是意大利空气炮。 最后,运行这个项目: + ```shell $ cargo run 从服务器端获取到结果=Some(b"world") @@ -89,6 +103,7 @@ $ cargo run Perfect, 代码成功运行,是时候来解释下其中蕴藏的至高奥秘了。 ## 原理解释 + 代码篇幅虽然不长,但是还是有不少值得关注的地方,接下来我们一起来看看。 ```rust @@ -100,6 +115,7 @@ let mut client = client::connect("127.0.0.1:6379").await?; 特别值得注意的是:虽然该连接是异步建立的,但是从代码本身来看,完全是**同步的代码编写方式**,唯一能说明异步的点就是 `.await`。 #### 什么是异步编程 + 大部分计算机程序都是按照代码编写的顺序来执行的:先执行第一行,然后第二行,以此类推(当然,还要考虑流程控制,例如循环)。当进行同步编程时,一旦程序遇到一个操作无法被立即完成,它就会进入阻塞状态,直到该操作完成为止。 因此同步编程非常符合我们人类的思维习惯,是一个顺其自然的过程,被几乎每一个程序员所喜欢(本来想说所有,但我不敢打包票,毕竟总有特立独行之士)。例如,当建立 TCP 连接时,当前线程会被阻塞,直到等待该连接建立完成,然后才往下继续进行。 @@ -109,7 +125,9 @@ let mut client = client::connect("127.0.0.1:6379").await?; 好在 Rust 为我们提供了 `async/await` 的异步编程特性,让我们可以像写同步代码那样去写异步的代码,也让这个世界美好依旧。 #### 编译时绿色线程 + 一个函数可以通过`async fn`的方式被标记为异步函数: + ```rust use mini_redis::Result; use mini_redis::client::Client; @@ -126,6 +144,7 @@ pub async fn connect(addr: T) -> Result { > async/await 的原理就算大家不理解,也不妨碍使用 `tokio` 写出能用的服务,但是如果想要更深入的用好,强烈建议认真读下本书的 [`async/await` 异步编程章节](https://course.rs/async/intro.html),你会对 Rust 的异步编程有一个全新且深刻的认识。 由于 `async` 会返回一个 `Future`,因此我们还需要配合使用 `.await` 来让该 `Future` 运行起来,最终获得返回值: + ```rust async fn say_to_world() -> String { String::from("world") @@ -145,6 +164,7 @@ async fn main() { ``` 上面代码输出如下: + ```shell hello world @@ -153,12 +173,14 @@ world 而大家可能很好奇 `async fn` 到底返回什么吧?它实际上返回的是一个实现了 `Future` 特征的匿名类型: `impl Future`。 #### async main + 在代码中,使用了一个与众不同的 `main` 函数 : `async fn main` ,而且是用 `#[tokio::main]` 属性进行了标记。异步 `main` 函数有以下意义: - `.await` 只能在 `async` 函数中使用,如果是以前的 `fn main`,那它内部是无法直接使用 `async` 函数的!这个会极大的限制了我们的使用场景 - 异步运行时本身需要初始化 - + 因此 `#[tokio::main]` 宏在将 `async fn main` 隐式的转换为 `fn main` 的同时还对整个异步运行时进行了初始化。例如以下代码: + ```rust #[tokio::main] async fn main() { @@ -167,6 +189,7 @@ async fn main() { ``` 将被转换成: + ```rust fn main() { let mut rt = tokio::runtime::Runtime::new().unwrap(); @@ -179,19 +202,22 @@ fn main() { 最终,Rust 编译器就愉快地执行这段代码了。 ## cargo feature + 在引入 `tokio` 包时,我们在 `Cargo.toml` 文件中添加了这么一行: + ```toml tokio = { version = "1", features = ["full"] } ``` -里面有个 `features = ["full"]` 可能大家会比较迷惑,当然,关于它的具体解释在本书的 [Cargo详解专题](https://course.rs/cargo/intro.html) 有介绍,这里就简单进行说明, +里面有个 `features = ["full"]` 可能大家会比较迷惑,当然,关于它的具体解释在本书的 [Cargo 详解专题](https://course.rs/cargo/intro.html) 有介绍,这里就简单进行说明, `Tokio` 有很多功能和特性,例如 `TCP`,`UDP`,`Unix sockets`,同步工具,多调度类型等等,不是每个应用都需要所有的这些特性。为了优化编译时间和最终生成可执行文件大小、内存占用大小,应用可以对这些特性进行可选引入。 而这里为了演示的方便,我们使用 `full` ,表示直接引入所有的特性。 ## 总结 -大家对 `tokio` 的初印象如何?可否24灯全绿通过? + +大家对 `tokio` 的初印象如何?可否 24 灯全绿通过? 总之,`tokio` 做的事情其实是细雨润无声的,在大多数时候,我们并不能感觉到它的存在,但是它确实是异步编程中最重要的一环(或者之一),深入了解它对我们的未来之路会有莫大的帮助。 diff --git a/src/tokio/graceful-shutdown.md b/src/tokio/graceful-shutdown.md index a3ca95f2..e460110d 100644 --- a/src/tokio/graceful-shutdown.md +++ b/src/tokio/graceful-shutdown.md @@ -1,7 +1,8 @@ # 优雅的关闭 -如果你的服务是一个小说阅读网站,那大概率用不到优雅关闭的,简单粗暴的关闭服务器,然后用户再次请求时获取一个错误就是了。但如果是一个web服务或数据库服务呢?当前的连接很可能在做着重要的事情,一旦关闭会导致数据的丢失甚至错误,此时,我们就需要优雅的关闭(graceful shutdown)了。 -要让一个异步应用优雅的关闭往往需要做到3点: +如果你的服务是一个小说阅读网站,那大概率用不到优雅关闭的,简单粗暴的关闭服务器,然后用户再次请求时获取一个错误就是了。但如果是一个 web 服务或数据库服务呢?当前的连接很可能在做着重要的事情,一旦关闭会导致数据的丢失甚至错误,此时,我们就需要优雅的关闭(graceful shutdown)了。 + +要让一个异步应用优雅的关闭往往需要做到 3 点: - 找出合适的关闭时机 - 通知程序的每一个子部分开始关闭 @@ -9,11 +10,12 @@ 在本文的下面部分,我们一起来看看该如何做到这三点。如果想要进一步了解在真实项目中该如何使用,大家可以看看 mini-redis 的完整代码实现,特别是 [`src/server.rs`](https://github.com/tokio-rs/mini-redis/blob/master/src/server.rs) 和 [`src/shutdown.rs`](https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs)。 - ## 找出合适的关闭时机 + 一般来说,何时关闭是取决于应用自身的,但是一个常用的关闭准则就是当应用收到来自于操作系统的关闭信号时。例如通过 `ctrl + c` 来关闭正在运行的命令行程序。 为了检测来自操作系统的关闭信号,`Tokio` 提供了一个 `tokio::signal::ctrl_c` 函数,它将一直睡眠直到收到对应的信号: + ```rust use tokio::signal; @@ -34,9 +36,11 @@ async fn main() { ``` ## 通知程序的每一个部分开始关闭 + 大家看到这个标题,不知道会想到用什么技术来解决问题,反正我首先想到的是,真的很像广播哎。。 事实上也是如此,最常见的通知程序各个部分关闭的方式就是使用一个广播消息通道。关于如何实现,其实也不复杂:应用中的每个任务都持有一个广播消息通道的接收端,当消息被广播到该通道时,每个任务都可以收到该消息,并关闭自己: + ```rust let next_frame = tokio::select! { res = self.connection.read_frame() => res?, @@ -47,7 +51,6 @@ let next_frame = tokio::select! { }; ``` - 在 `mini-redis` 中,当收到关闭消息时,任务会立即结束,但在实际项目中,这种方式可能会过于理想,例如当我们向文件或数据库写入数据时,立刻终止任务可能会导致一些无法预料的错误,因此,在结束前做一些收尾工作会是非常好的选择。 除此之外,还有两点值得注意: @@ -56,6 +59,7 @@ let next_frame = tokio::select! { - 还可以使用 [`watch channel`](https://docs.rs/tokio/1.16.1/tokio/sync/watch/index.html) 实现同样的效果,与之前的方式相比,这两种方法并没有太大的区别 ## 等待各个部分的结束 + 在之前章节,我们讲到过一个 [`mpsc`](https://docs.rs/tokio/1/tokio/sync/mpsc/index.html) 消息通道有一个重要特性:当所有发送端都 `drop` 时,消息通道会自动关闭,此时继续接收消息就会报错。 大家发现没?这个特性特别适合优雅关闭的场景:主线程持有消息通道的接收端,然后每个代码部分拿走一个发送端,当该部分结束时,就 `drop` 掉发送端,因此所有发送端被 `drop` 也就意味着所有的部分都已关闭,此时主线程的接收端就会收到错误,进而结束。 diff --git a/src/tokio/intro.md b/src/tokio/intro.md index 06f1508e..5a7c043f 100644 --- a/src/tokio/intro.md +++ b/src/tokio/intro.md @@ -1,6 +1,8 @@ -# Tokio使用指南 +# Tokio 使用指南 + 在上一个章节中,我们提到了 Rust 异步编程的限制,其中之一就是你必须引入社区提供的异步运行时,其中最有名的就是 `tokio`。 在本章中,我们一起来看看 `tokio` 到底有什么优势,以及该如何使用它。 -> 本章在内容上大量借鉴和翻译了tokio官方文档[Tokio Tutorial](https://tokio.rs/tokio/tutorial), 但是重新组织了内容形式并融入了很多自己的见解和感悟,给大家提供更好的可读性和知识扩展性 \ No newline at end of file +> 本章在内容上大量借鉴和翻译了 tokio 官方文档[Tokio Tutorial](https://tokio.rs/tokio/tutorial), 但是重新组织了内容形式并融入了很多自己的见解和感悟,给大家提供更好的可读性和知识扩展性 + diff --git a/src/tokio/io.md b/src/tokio/io.md index 58cff8dd..cb07c535 100644 --- a/src/tokio/io.md +++ b/src/tokio/io.md @@ -1,4 +1,5 @@ # I/O + 本章节中我们将深入学习 Tokio 中的 I/O 操作,了解它的原理以及该如何使用。 Tokio 中的 I/O 操作和 `std` 在使用方式上几无区别,最大的区别就是前者是异步的,例如 Tokio 的读写特征分别是 `AsyncRead` 和 `AsyncWrite`: @@ -7,10 +8,13 @@ Tokio 中的 I/O 操作和 `std` 在使用方式上几无区别,最大的区 - 还有数据结构也实现了它们:`Vec`、`&[u8]`,这样就可以直接使用这些数据结构作为读写器( reader / writer) ## AsyncRead 和 AsyncWrite + 这两个特征为字节流的异步读写提供了便利,通常我们会使用 `AsyncReadExt` 和 `AsyncWriteExt` 提供的工具方法,这些方法都使用 `async` 声明,且需要通过 `.await` 进行调用, #### async fn read - `AsyncReadExt::read` 是一个异步方法可以将数据读入缓冲区( `buffer` )中,然后返回读取的字节数。 + +`AsyncReadExt::read` 是一个异步方法可以将数据读入缓冲区( `buffer` )中,然后返回读取的字节数。 + ```rust use tokio::fs::File; use tokio::io::{self, AsyncReadExt}; @@ -31,7 +35,9 @@ async fn main() -> io::Result<()> { 需要注意的是:当 `read` 返回 `Ok(0)` 时,意味着字节流( stream )已经关闭,在这之后继续调用 `read` 会立刻完成,依然获取到返回值 `Ok(0)`。 例如,字节流如果是 `TcpStream` 类型,那 `Ok(0)` 说明该**连接的读取端已经被关闭**(写入端关闭,会报其它的错误)。 #### async fn read_to_end + `AsyncReadExt::read_to_end` 方法会从字节流中读取所有的字节,直到遇到 `EOF` : + ```rust use tokio::io::{self, AsyncReadExt}; use tokio::fs::File; @@ -48,7 +54,9 @@ async fn main() -> io::Result<()> { ``` #### async fn write + `AsyncWriteExt::write` 异步方法会尝试将缓冲区的内容写入到写入器( `writer` )中,同时返回写入的字节数: + ```rust use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; @@ -67,7 +75,9 @@ async fn main() -> io::Result<()> { 上面代码很清晰,但是大家可能会疑惑 `b"some bytes"` 是什么意思。这种写法可以将一个 `&str` 字符串转变成一个字节数组:`&[u8;10]`,然后 `write` 方法又会将这个 `&[u8;10]` 的数组类型隐式强转为数组切片: `&[u8]`。 #### async fn write_all + `AsyncWriteExt::write_all` 将缓冲区的内容全部写入到写入器中: + ```rust use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; @@ -84,9 +94,11 @@ async fn main() -> io::Result<()> { 以上只是部分方法,实际上还有一些实用的方法由于篇幅有限无法列出,大家可以通过 [API 文档](https://docs.rs/tokio/latest/tokio/io/index.html) 查看完整的列表。 ## 实用函数 -另外,和标准库一样, `tokio::io` 模块包含了多个实用的函数或API,可以用于处理标准输入/输出/错误等。 + +另外,和标准库一样, `tokio::io` 模块包含了多个实用的函数或 API,可以用于处理标准输入/输出/错误等。 例如,`tokio::io::copy` 异步的将读取器( `reader` )中的内容拷贝到写入器( `writer` )中。 + ```rust use tokio::fs::File; use tokio::io; @@ -104,6 +116,7 @@ async fn main() -> io::Result<()> { 还记得我们之前提到的字节数组 `&[u8]` 实现了 `AsyncRead` 吗?正因为这个原因,所以这里可以直接将 `&u8` 用作读取器。 ## 回声服务( Echo ) + 就如同写代码必写 `hello, world`,实现 web 服务器,往往会选择实现一个回声服务。该服务会将用户的输入内容直接返回给用户,就像回声壁一样。 具体来说,就是从用户建立的 TCP 连接的 socket 中读取到数据,然后立刻将同样的数据写回到该 socket 中。因此客户端会收到和自己发送的数据一模一样的回复。 @@ -111,12 +124,15 @@ async fn main() -> io::Result<()> { 下面我们将使用两种稍有不同的方法实现该回声服务。 #### 使用 `io::copy()` + 先来创建一个新的 bin 文件,用于运行我们的回声服务: + ```console touch src/bin/echo-server-copy.rs ``` 然后可以通过以下命令运行它(跟上一章节的方式相同): + ```console cargo run --bin echo-server-copy ``` @@ -144,6 +160,7 @@ async fn main() -> io::Result<()> { ``` 下面,来看看重头戏 `io::copy` ,它有两个参数:一个读取器,一个写入器,然后将读取器中的数据直接拷贝到写入器中,类似的实现代码如下: + ```rust io::copy(&mut socket, &mut socket).await ``` @@ -151,11 +168,13 @@ io::copy(&mut socket, &mut socket).await 这段代码相信大家一眼就能看出问题,由于我们的读取器和写入器都是同一个 socket,因此需要对其进行两次可变借用,这明显违背了 Rust 的借用规则。 ##### 分离读写器 + 显然,使用同一个 socket 是不行的,为了实现目标功能,必须将 `socket` 分离成一个读取器和写入器。 任何一个读写器( reader + writer )都可以使用 `io::split` 方法进行分离,最终返回一个读取器和写入器,这两者可以独自的使用,例如可以放入不同的任务中。 例如,我们的回声客户端可以这样实现,以实现同时并发读写: + ```rust use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; @@ -195,12 +214,12 @@ async fn main() -> io::Result<()> { - [`TcpStream::split`](https://docs.rs/tokio/1.15.0/tokio/net/struct.TcpStream.html#method.split)会获取字节流的引用,然后将其分离成一个读取器和写入器。但由于使用了引用的方式,它们俩必须和 `split` 在同一个任务中。 优点就是,这种实现没有性能开销,因为无需 `Arc` 和 `Mutex`。 - [`TcpStream::into_split`](https://docs.rs/tokio/1.15.0/tokio/net/struct.TcpStream.html#method.into_split)还提供了一种分离实现,分离出来的结果可以在任务间移动,内部是通过 `Arc` 实现 - 再来分析下我们的使用场景,由于 `io::copy()` 调用时所在的任务和 `split` 所在的任务是同一个,因此可以使用性能最高的 `TcpStream::split`: + ```rust tokio::spawn(async move { let (mut rd, mut wr) = socket.split(); - + if io::copy(&mut rd, &mut wr).await.is_err() { eprintln!("failed to copy"); } @@ -210,7 +229,9 @@ tokio::spawn(async move { 使用 `io::copy` 实现的完整代码见[此处](https://github.com/tokio-rs/website/blob/master/tutorial-code/io/src/echo-server-copy.rs)。 #### 手动拷贝 + 程序员往往拥有一颗手动干翻一切的心,因此如果你不想用 `io::copy` 来简单实现,还可以自己手动去拷贝数据: + ```rust use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; @@ -251,12 +272,15 @@ async fn main() -> io::Result<()> { 建议这段代码放入一个和之前 `io::copy` 不同的文件中 `src/bin/echo-server.rs` , 然后使用 `cargo run --bin echo-server` 运行。 下面一起来看看这段代码有哪些值得注意的地方。首先,由于使用了 `write_all` 和 `read` 方法,需要先将对应的特征引入到当前作用域内: + ```rust use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; ``` ##### 在堆上分配缓冲区 + 在上面代码中,我们需要将数据从 `socket` 中读取到一个缓冲区 `buffer` 中: + ```rust let mut buf = vec![0; 1024]; ``` @@ -266,6 +290,7 @@ let mut buf = vec![0; 1024]; 在之前,我们提到过一个数据如果想在 `.await` 调用过程中存在,那它必须存储在当前任务内。在我们的代码中,`buf` 会在 `.await` 调用过程中被使用,因此它必须要存储在任务内。 若该缓冲区数组创建在栈上,那每条连接所对应的任务的内部数据结构看上去可能如下所示: + ```rust struct Task { task: enum { @@ -289,11 +314,13 @@ struct Task { 但是再怎么优化,任务的结构体至少也会跟其中的栈数组一样大,因此通常情况下,使用堆上的缓冲区会高效实用的多。 > 当任务因为调度在线程间移动时,存储在栈上的数据需要进行保存和恢复,过大的栈上变量会带来不小的数据拷贝开销 -> +> > 因此,存储大量数据的变量最好放到堆上 -##### 处理EOF +##### 处理 EOF + 当 TCP 连接的读取端关闭后,再调用 `read` 方法会返回 `Ok(0)`。此时,再继续下去已经没有意义,因此我们需要退出循环。忘记在 EOF 时退出读取循环,是网络编程中一个常见的 bug : + ```rust loop { match socket.read(&mut buf).await { @@ -303,4 +330,5 @@ loop { } ``` -大家不妨深入思考下,如果没有退出循环会怎么样?之前我们提到过,一旦读取端关闭后,那后面的 `read` 调用就会立即返回 `Ok(0)`,而不会阻塞等待,因此这种无阻塞循环会最终导致 CPU 立刻跑到 100% ,并将一直持续下去,直到程序关闭。 \ No newline at end of file +大家不妨深入思考下,如果没有退出循环会怎么样?之前我们提到过,一旦读取端关闭后,那后面的 `read` 调用就会立即返回 `Ok(0)`,而不会阻塞等待,因此这种无阻塞循环会最终导致 CPU 立刻跑到 100% ,并将一直持续下去,直到程序关闭。 + diff --git a/src/tokio/overview.md b/src/tokio/overview.md index ef9d62bc..f21e194c 100644 --- a/src/tokio/overview.md +++ b/src/tokio/overview.md @@ -1,7 +1,9 @@ -# tokio概览 -对于 Async Rust,最最重要的莫过于底层的异步运行时,它提供了执行器、任务调度、异步API等核心服务。简单来说,使用 Rust 提供的 `async/.await` 特性编写的异步代码要运行起来,就必须依赖于异步运行时,否则这些代码将毫无用处。 +# tokio 概览 + +对于 Async Rust,最最重要的莫过于底层的异步运行时,它提供了执行器、任务调度、异步 API 等核心服务。简单来说,使用 Rust 提供的 `async/.await` 特性编写的异步代码要运行起来,就必须依赖于异步运行时,否则这些代码将毫无用处。 ## 异步运行时 + Rust 语言本身只提供了异步编程所需的基本特性,例如 `async/.await` 关键字,标准库中的 `Future` 特征,官方提供的 `futures` 实用库,这些特性单独使用没有任何用处,因此我们需要一个运行时来将这些特性实现的代码运行起来。 异步运行时是由 Rust 社区提供的,它们的核心是一个 `reactor` 和一个或多个 `executor`(执行器): @@ -18,6 +20,7 @@ Rust 语言本身只提供了异步编程所需的基本特性,例如 `async/. 但是,大浪淘沙,留下的才是金子,随着时间的流逝,`tokio`越来越亮眼,无论是性能、功能还是社区、文档,它在各个方面都异常优秀,时至今日,可以说已成为事实上的标准。 #### 异步运行时的兼容性 + 为何选择异步运行时这么重要?不仅仅是它们在功能、性能上存在区别,更重要的是当你选择了一个,往往就无法切换到另外一个,除非异步代码很少。 使用异步运行时,往往伴随着对它相关的生态系统的深入使用,因此耦合性会越来越强,直至最后你很难切换到另一个运行时,例如 `tokio` 和 `async-std` ,就存在这种问题。 @@ -25,18 +28,21 @@ Rust 语言本身只提供了异步编程所需的基本特性,例如 `async/. 如果你实在有这种需求,可以考虑使用 [`async-compat`](https://github.com/smol-rs/async-compat),该包提供了一个中间层,用于兼容 `tokio` 和其它运行时。 #### 结论 + 相信大家看到现在,心中应该有一个结论了。首先,运行时之间的不兼容性,让我们必须提前选择一个运行时,并且在未来坚持用下去,那这个运行时就应该是最优秀、最成熟的那个,`tokio` 几乎成了不二选择,当然 `tokio` 也有自己的问题:更难上手和运行时之间的兼容性。 如果你只用 `tokio` ,那兼容性自然不是问题,至于难以上手,Rust 这么难,我们都学到现在了,何况区区一个异步运行时,在本书的帮忙下,这些都不再是个问题:) -## tokio简介 -tokio是一个纸醉金迷之地,只要有钱就可以为所欲为,哦,抱歉,走错片场了。`tokio` 是 Rust 最优秀的异步运行时框架,它提供了写异步网络服务所需的几乎所有功能,不仅仅适用于大型服务器,还适用于小型嵌入式设备,它主要由以下组件构成: +## tokio 简介 + +tokio 是一个纸醉金迷之地,只要有钱就可以为所欲为,哦,抱歉,走错片场了。`tokio` 是 Rust 最优秀的异步运行时框架,它提供了写异步网络服务所需的几乎所有功能,不仅仅适用于大型服务器,还适用于小型嵌入式设备,它主要由以下组件构成: - 多线程版本的异步运行时,可以运行使用 `async/.await` 编写的代码 -- 标准库中阻塞API的异步版本,例如`thread::sleep`会阻塞当前线程,`tokio`中就提供了相应的异步实现版本 +- 标准库中阻塞 API 的异步版本,例如`thread::sleep`会阻塞当前线程,`tokio`中就提供了相应的异步实现版本 - 构建异步编程所需的生态,甚至还提供了 [`tracing`](https://github.com/tokio-rs/tracing) 用于日志和分布式追踪, 提供 [`console`](https://github.com/tokio-rs/console) 用于 Debug 异步编程 ### 优势 + 下面一起来看看使用 `tokio` 能给你提供哪些优势。 **高性能** @@ -48,7 +54,7 @@ tokio是一个纸醉金迷之地,只要有钱就可以为所欲为,哦,抱 **高可靠** -Rust 语言的安全可靠性顺理成章的影响了 `tokio` 的可靠性,曾经有一个调查给出了令人乍舌的[结论](https://www.zdnet.com/article/microsoft-70-percent-of-all-security-bugs-are-memory-safety-issues/):软件系统70%的高危漏洞都是由内存不安全性导致的。 +Rust 语言的安全可靠性顺理成章的影响了 `tokio` 的可靠性,曾经有一个调查给出了令人乍舌的[结论](https://www.zdnet.com/article/microsoft-70-percent-of-all-security-bugs-are-memory-safety-issues/):软件系统 70%的高危漏洞都是由内存不安全性导致的。 在 Rust 提供的安全性之外,`tokio` 还致力于提供一致性的行为表现:无论你何时运行系统,它的预期表现和性能都是一致的,例如不会出现莫名其妙的请求延迟或响应时间大幅增加。 @@ -63,14 +69,16 @@ Rust 语言的安全可靠性顺理成章的影响了 `tokio` 的可靠性,曾 `tokio` 支持你灵活的定制自己想要的运行时,例如你可以选择多线程 + 任务盗取模式的复杂运行时,也可以选择单线程的轻量级运行时。总之,几乎你的每一种需求在 `tokio` 中都能寻找到支持(画外音:强大的灵活性需要一定的复杂性来换取,并不是免费的午餐)。 ### 劣势 + 虽然 `tokio` 对于大多数需要并发的项目都是非常适合的,但是确实有一些场景它并不适合使用: -- 并行运行CPU密集型的任务,`tokio` 非常适合于IO密集型任务,这些IO任务的绝大多数时间都用于阻塞等待IO的结果,而不是刷刷刷的单烤CPU。如果你的应用是CPU密集型(例如并行计算),建议使用 [`rayon`](https://github.com/rayon-rs/rayon),当然,对于其中的IO任务部分,你依然可以混用 `tokio` -- 读取大量的文件, 读取文件的瓶颈主要在于操作系统,因为OS没有提供异步文件读取接口,大量的并发并不会提升文件读取的并行性能,反而可能会造成不可忽视的性能损耗,因此建议使用线程(或线程池)的方式 -- 发送HTTP请求,`tokio` 的优势是给予你并发处理大量任务的能力,对于这种轻量级 HTTP 请求场景,`tokio` 除了增加你的代码复杂性,并无法带来什么额外的优势。因此,对于这种场景,你可以使用 [`reqwest`](https://github.com/seanmonstar/reqwest) 库,它会更加简单易用。 - +- 并行运行 CPU 密集型的任务,`tokio` 非常适合于 IO 密集型任务,这些 IO 任务的绝大多数时间都用于阻塞等待 IO 的结果,而不是刷刷刷的单烤 CPU。如果你的应用是 CPU 密集型(例如并行计算),建议使用 [`rayon`](https://github.com/rayon-rs/rayon),当然,对于其中的 IO 任务部分,你依然可以混用 `tokio` +- 读取大量的文件, 读取文件的瓶颈主要在于操作系统,因为 OS 没有提供异步文件读取接口,大量的并发并不会提升文件读取的并行性能,反而可能会造成不可忽视的性能损耗,因此建议使用线程(或线程池)的方式 +- 发送 HTTP 请求,`tokio` 的优势是给予你并发处理大量任务的能力,对于这种轻量级 HTTP 请求场景,`tokio` 除了增加你的代码复杂性,并无法带来什么额外的优势。因此,对于这种场景,你可以使用 [`reqwest`](https://github.com/seanmonstar/reqwest) 库,它会更加简单易用。 ## 总结 + 离开三方开源社区提供的异步运行时, `async/await` 什么都不是,甚至还不如一堆破铜烂铁,除非你选择根据自己的需求手撸一个。 -而 `tokio` 就是那颗皇冠上的夜明珠,也是值得我们投入时间去深入学习的开源库,它的设计原理和代码实现都异常优秀,在之后的章节中,我们将对其进行深入学习和剖析,敬请期待。 \ No newline at end of file +而 `tokio` 就是那颗皇冠上的夜明珠,也是值得我们投入时间去深入学习的开源库,它的设计原理和代码实现都异常优秀,在之后的章节中,我们将对其进行深入学习和剖析,敬请期待。 + diff --git a/src/tokio/select.md b/src/tokio/select.md index 9bbb27ea..e8f99e45 100644 --- a/src/tokio/select.md +++ b/src/tokio/select.md @@ -1,8 +1,11 @@ # select! + 在实际使用时,一个重要的场景就是同时等待多个异步操作的结果,并且对其结果进行进一步处理,在本章节,我们来看看,强大的 `select!` 是如何帮助咱们更好的控制多个异步操作并发执行的。 ## tokio::select! + `select!` 允许同时等待多个计算操作,然后当其中一个操作完成时就退出等待: + ```rust use tokio::sync::oneshot; @@ -37,6 +40,7 @@ async fn main() { 需要注意,任何一个 `select` 分支完成后,都会继续执行后面的代码,没被执行的分支会被丢弃( `dropped` )。 #### 取消 + 对于 `Async Rust` 来说,释放( drop )掉一个 `Future` 就意味着取消任务。从上一章节可以得知, `async` 操作会返回一个 `Future`,而后者是惰性的,直到被 `poll` 调用时,才会被执行。一旦 `Future` 被释放,那操作将无法继续,因为所有相关的状态都被释放。 对于 Tokio 的 `oneshot` 的接收端来说,它在被释放时会发送一个关闭通知到发送端,因此发送端可以通过释放任务的方式来终止正在执行的任务。 @@ -86,6 +90,7 @@ async fn main() { 上面代码的重点就在于 `tx1.closed` 所在的分支,一旦发送端被关闭,那该分支就会被执行,然后 `select` 会退出,并清理掉还没执行的第一个分支 `val = some_operation()` ,这其中 `some_operation` 返回的 `Future` 也会被清理,根据之前的内容,`Future` 被清理那相应的任务会立即取消,因此 `some_operation` 会被取消,不再执行。 #### Future 的实现 + 为了更好的理解 `select` 的工作原理,我们来看看如果使用 `Future` 该如何实现。当然,这里是一个简化版本,在实际中,`select!` 会包含一些额外的功能,例如一开始会随机选择一个分支进行 `poll`。 ```rust @@ -139,9 +144,10 @@ async fn main() { 但是仔细观察我们之前的代码,里面并没有任何的 `wake` 调用!事实上,这是因为参数 `cx` 被传入了内层的 `poll` 调用。 只要内部的 `Future` 实现了唤醒并且返回了 `Poll::Pending`,那 `MySelect` 也等于实现了唤醒! - ## 语法 + 目前来说,`select!` 最多可以支持 64 个分支,每个分支形式如下: + ```rust <模式> = => <结果处理>, ``` @@ -155,6 +161,7 @@ async fn main() { 由于 `select!` 使用的是一个 `async` 表达式,因此我们可以定义一些更复杂的计算。 例如从在分支中进行 TCP 连接: + ```rust use tokio::net::TcpStream; use tokio::sync::oneshot; @@ -180,6 +187,7 @@ async fn main() { ``` 再比如,在分支中进行 TCP 监听: + ```rust use tokio::net::TcpListener; use tokio::sync::oneshot; @@ -217,7 +225,9 @@ async fn main() -> io::Result<()> { 分支中接收连接的循环会一直运行,直到遇到错误才停止,或者当 `rx` 中有值时,也会停止。 `_` 表示我们并不关心这个值,这样使用唯一的目的就是为了结束第一分支中的循环。 ## 返回值 + `select!` 还能返回一个值: + ```rust async fn computation1() -> String { // .. 计算 @@ -241,6 +251,7 @@ async fn main() { 需要注意的是,此时 `select!` 的所有分支必须返回一样的类型,否则编译器会报错! ## 错误传播 + 在 Rust 中使用 `?` 可以对错误进行传播,但是在 `select!` 中,`?` 如何工作取决于它是在分支中的 `async` 表达式使用还是在结果处理的代码中使用: - 在分支中 `async` 表达式使用会将该表达式的结果变成一个 `Result` @@ -281,9 +292,10 @@ async fn main() -> io::Result<()> { 与之不同的是,结果处理中的 `res?;` 会让 `main` 函数直接结束并返回一个 `Result`,可以看出,这里 `?` 的用法跟我们平时的用法并无区别。 - ## 模式匹配 + 既然是模式匹配,我们需要再来回忆下 `select!` 的分支语法形式: + ```rust <模式> = => <结果处理>, ``` @@ -319,9 +331,11 @@ async fn main() { 上面代码中,`rx` 通道关闭后,`recv()` 方法会返回一个 `None`,可以看到没有任何模式能够匹配这个 `None`,那为何不会报错?秘密就在于 `else` 上:当使用模式去匹配分支时,若之前的所有分支都无法被匹配,那 `else` 分支将被执行。 ## 借用 + 当在 Tokio 中生成( spawn )任务时,其 async 语句块必须拥有其中数据的所有权。而 `select!` 并没有这个限制,它的每个分支表达式可以直接借用数据,然后进行并发操作。只要遵循 Rust 的借用规则,多个分支表达式可以不可变的借用同一个数据,或者在一个表达式可变的借用某个数据。 来看个例子,在这里我们同时向两个 TCP 目标发送同样的数据: + ```rust use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; @@ -355,7 +369,7 @@ async fn race( 如果你把连接过程放在了结果处理中,那连接失败会直接从 `race` 函数中返回,而不是继续执行另一个分支中的连接! -还有一个非常重要的点,**借用规则在分支表达式和结果处理中存在很大的不同**。例如上面代码中,我们在两个分支表达式中分别对 `data` 做了不可变借用,这当然ok,但是若是两次可变借用,那编译器会立即进行报错。但是转折来了:当在结果处理中进行两次可变借用时,却不会报错,大家可以思考下为什么,提示下:思考下分支在执行完成后会发生什么? +还有一个非常重要的点,**借用规则在分支表达式和结果处理中存在很大的不同**。例如上面代码中,我们在两个分支表达式中分别对 `data` 做了不可变借用,这当然 ok,但是若是两次可变借用,那编译器会立即进行报错。但是转折来了:当在结果处理中进行两次可变借用时,却不会报错,大家可以思考下为什么,提示下:思考下分支在执行完成后会发生什么? ```rust use tokio::sync::oneshot; @@ -386,7 +400,9 @@ async fn main() { 例如以上代码,就在两个分支的结果处理中分别进行了可变借用,并不会报错。原因就在于:`select!`会保证只有一个分支的结果处理会被运行,然后在运行结束后,另一个分支会被直接丢弃。 ## 循环 + 来看看该如何在循环中使用 `select!`,顺便说一句,跟循环一起使用是最常见的使用方式。 + ```rust use tokio::sync::mpsc; @@ -416,6 +432,7 @@ async fn main() { 老生常谈的一句话:`select!` 中哪个分支先被执行是无法确定的,因此不要依赖于分支执行的顺序!想象一下,在异步编程场景,若 `select!` 按照分支的顺序来执行会如何:若 `rx1` 中总是有数据,那每次循环都只会去处理第一个分支,后面两个分支永远不会被执行。 #### 恢复之前的异步操作 + ```rust async fn action() { // 一些异步逻辑 @@ -423,11 +440,11 @@ async fn action() { #[tokio::main] async fn main() { - let (mut tx, mut rx) = tokio::sync::mpsc::channel(128); - + let (mut tx, mut rx) = tokio::sync::mpsc::channel(128); + let operation = action(); tokio::pin!(operation); - + loop { tokio::select! { _ = &mut operation => break, @@ -450,6 +467,7 @@ async fn main() { 还有一个就是我们使用了 `tokio::pin!`,具体的细节这里先不介绍,值得注意的点是:如果要在一个引用上使用 `.await`,那么引用的值就必须是不能移动的或者实现了 `Unpin`,关于 `Pin` 和 `Unpin` 可以参见[这里](https://course.rs/async/pin-unpin.html)。 一旦移除 `tokio::pin!` 所在行的代码,然后试图编译,就会获得以下错误: + ```console error[E0599]: no method named `poll` found for struct `std::pin::Pin<&mut &mut impl std::future::Future>` @@ -476,6 +494,7 @@ error[E0599]: no method named `poll` found for struct 虽然我们已经学了很多关于 `Future` 的知识,但是这个错误依然不太好理解。但是它不难解决:当你试图在**一个引用上调用 `.await` 然后遇到了 `Future 未实现` 这种错误时**,往往只需要将对应的 `Future` 进行固定即可: ` tokio::pin!(operation);`。 #### 修改一个分支 + 下面一起来看看一个稍微复杂一些的 `loop` 循环,首先,我们拥有: - 一个消息通道可以传递 `i32` 类型的值 @@ -503,17 +522,17 @@ async fn action(input: Option) -> Option { #[tokio::main] async fn main() { let (mut tx, mut rx) = tokio::sync::mpsc::channel(128); - + let mut done = false; let operation = action(None); tokio::pin!(operation); - + tokio::spawn(async move { let _ = tx.send(1).await; let _ = tx.send(3).await; let _ = tx.send(2).await; }); - + loop { tokio::select! { res = &mut operation, if !done => { @@ -541,6 +560,7 @@ async fn main() { 当第二个分支收到一个偶数时,`done` 会被修改为 `false`,且 `operation` 被设置了值。 此后再一次循环时,第一个分支会被执行,且 `operation` 返回一个 `Some(2)`,因此会触发 `return` ,最终结束循环并返回。 这段代码引入了一个新的语法: `if !done`,在解释之前,先看看去掉后会如何: + ```console thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace @@ -555,8 +575,9 @@ note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace 那大家肯定有疑问了,既然 `operation` 不能再被调用了,我们该如何在有偶数值时,再回到第一个分支对其进行调用呢?答案就是 `operation.set(action(Some(v)));`,该操作会重新使用新的参数设置 `operation`。 ## spawn 和 select! 的一些不同 + 学到现在,相信大家对于 `tokio::spawn` 和 `select!` 已经非常熟悉,它们的共同点就是都可以并发的运行异步操作。 -然而它们使用的策略大相径庭。 +然而它们使用的策略大相径庭。 `tokio::spawn` 函数会启动新的任务来运行一个异步操作,每个任务都是一个独立的对象可以单独被 Tokio 调度运行,因此两个不同的任务的调度都是独立进行的,甚至于它们可能会运行在两个不同的操作系统线程上。鉴于此,生成的任务和生成的线程有一个相同的限制:不允许对外部环境中的值进行借用。 diff --git a/src/tokio/shared-state.md b/src/tokio/shared-state.md index 34bed3d5..3c96085f 100644 --- a/src/tokio/shared-state.md +++ b/src/tokio/shared-state.md @@ -1,7 +1,9 @@ # 共享状态 + 上一章节中,咱们搭建了一个异步的 redis 服务器,并成功的提供了服务,但是其隐藏了一个巨大的问题:状态(数据)无法在多个连接之间共享,下面一起来看看该如何解决。 ## 解决方法 + 好在 Tokio 十分强大,上面问题对应的解决方法也不止一种: - 使用 `Mutex` 来保护数据的共享访问 @@ -12,19 +14,23 @@ 在上面的描述中,说实话第二种方法及其适用的场景并不是很好理解,但没关系,在后面章节会进行详细介绍。 ## 添加 `bytes` 依赖包 + 在上一节中,我们使用 `Vec` 来保存目标数据,但是它有一个问题,对它进行克隆时会将底层数据也整个复制一份,效率很低,但是克隆操作对于我们在多连接间共享数据又是必不可少的。 因此这里咱们新引入一个 `bytes` 包,它包含一个 `Bytes` 类型,当对该类型的值进行克隆时,就不再会克隆底层数据。事实上,`Bytes` 是一个引用计数类型,跟 `Arc` 非常类似,或者准确的说,`Bytes` 就是基于 `Arc` 实现的,但相比后者`Bytes` 提供了一些额外的能力。 -在 `Cargo.toml` 的 `[dependencies]` 中引入 `bytes` : +在 `Cargo.toml` 的 `[dependencies]` 中引入 `bytes` : + ```console bytes = "1" ``` ## 初始化 HashMap + 由于 `HashMap` 会在多个任务甚至多个线程间共享,再结合之前的选择,最终我们决定使用 `>>` 的方式对其进行包裹。 但是,大家先来畅想一下使用它进行包裹后的类型长什么样? 大概,可能,长这样:`Arc>>`,天哪噜,一不小心,你就遇到了 Rust 的阴暗面:类型大串烧。可以想象,如果要在代码中到处使用这样的类型,可读性会极速下降,因此我们需要一个[类型别名](https://course.rs/advance/custom-type.html#类型别名type-alias)( type alias )来简化下: + ```rust use bytes::Bytes; use std::collections::HashMap; @@ -64,6 +70,7 @@ async fn main() { ``` #### 为何使用 `std::sync::Mutex` + 上面代码还有一点非常重要,那就是我们使用了 `std::sync::Mutex` 来保护 `HashMap`,而不是使用 `tokio::sync::Mutex`。 在使用 Tokio 编写异步代码时,一个常见的错误无条件地使用 `tokio::sync::Mutex` ,而真相是:Tokio 提供的异步锁只应该在跨多个 `.await`调用时使用,而且 Tokio 的 `Mutex` 实际上内部使用的也是 `std::sync::Mutex`。 @@ -74,9 +81,10 @@ async fn main() { - 锁竞争不多的情况下,使用 `std::sync::Mutex` - 锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如 [`parking_lot::Mutex`](https://docs.rs/parking_lot/0.10.2/parking_lot/type.Mutex.html) - ## 更新 `process()` + `process()` 函数不再初始化 `HashMap`,取而代之的是它使用了 `HashMap` 的一个 `handle` 作为参数: + ```rust use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; @@ -92,7 +100,7 @@ async fn process(socket: TcpStream, db: Db) { let mut db = db.lock().unwrap(); db.insert(cmd.key().to_string(), cmd.value().clone()); Frame::Simple("OK".to_string()) - } + } Get(cmd) => { let db = db.lock().unwrap(); if let Some(value) = db.get(cmd.key()) { @@ -110,11 +118,12 @@ async fn process(socket: TcpStream, db: Db) { ``` ## 任务、线程和锁竞争 + 当竞争不多的时候,使用阻塞性的锁去保护共享数据是一个正确的选择。当一个锁竞争触发后,当前正在执行任务(请求锁)的线程会被阻塞,并等待锁被前一个使用者释放。这里的关键就是:**锁竞争不仅仅会导致当前的任务被阻塞,还会导致执行任务的线程被阻塞,因此该线程准备执行的其它任务也会因此被阻塞!** 默认情况下,Tokio 调度器使用了多线程模式,此时如果有大量的任务都需要访问同一个锁,那么锁竞争将变得激烈起来。当然,你也可以使用 [**current_thread**](https://docs.rs/tokio/1.15.0/tokio/runtime/index.html#current-thread-scheduler) 运行时设置,在该设置下会使用一个单线程的调度器(执行器),所有的任务都会创建并执行在当前线程上,因此不再会有锁竞争。 -> current_thread 是一个轻量级、单线程的运行时,当任务数不多或连接数不多时是一个很好的选择。例如你想在一个异步客户端库的基础上提供给用户同步的API访问时,该模式就很适用 +> current_thread 是一个轻量级、单线程的运行时,当任务数不多或连接数不多时是一个很好的选择。例如你想在一个异步客户端库的基础上提供给用户同步的 API 访问时,该模式就很适用 当同步锁的竞争变成一个问题时,使用 Tokio 提供的异步锁几乎并不能帮你解决问题,此时可以考虑如下选项: @@ -123,6 +132,7 @@ async fn process(socket: TcpStream, db: Db) { - 重构代码以避免锁 在我们的例子中,由于每一个 `key` 都是独立的,因此对锁进行分片将成为一个不错的选择: + ```rust type ShardedDb = Arc>>>>; @@ -135,18 +145,21 @@ fn new_sharded_db(num_shards: usize) -> ShardedDb { } ``` -在这里,我们创建了 N 个不同的存储实例,每个实例都会存储不同的分片数据,例如我们有`a-i`共9个不同的 `key`, 可以将存储分成3个实例,那么第一个实例可以存储 `a-c`,第二个`d-f`,以此类推。在这种情况下,访问 `b` 时,只需要锁住第一个实例,此时二、三实例依然可以正常访问,因此锁被成功的分片了。 +在这里,我们创建了 N 个不同的存储实例,每个实例都会存储不同的分片数据,例如我们有`a-i`共 9 个不同的 `key`, 可以将存储分成 3 个实例,那么第一个实例可以存储 `a-c`,第二个`d-f`,以此类推。在这种情况下,访问 `b` 时,只需要锁住第一个实例,此时二、三实例依然可以正常访问,因此锁被成功的分片了。 在分片后,使用给定的 key 找到对应的值就变成了两个步骤:首先,使用 `key` 通过特定的算法寻找到对应的分片,然后再使用该 `key` 从分片中查询到值: + ```rust let shard = db[hash(key) % db.len()].lock().unwrap(); shard.insert(key, value); ``` -这里我们使用 `hash` 算法来进行分片,但是该算法有个缺陷:分片的数量不能变,一旦变了后,那之前落入分片1 的`key`很可能将落入到其它分片中,最终全部乱掉。此时你可以考虑[dashmap](https://docs.rs/dashmap),它提供了更复杂、更精妙的支持分片的`hash map`。 +这里我们使用 `hash` 算法来进行分片,但是该算法有个缺陷:分片的数量不能变,一旦变了后,那之前落入分片 1 的`key`很可能将落入到其它分片中,最终全部乱掉。此时你可以考虑[dashmap](https://docs.rs/dashmap),它提供了更复杂、更精妙的支持分片的`hash map`。 ## 在 `.await` 期间持有锁 + 在某些时候,你可能会不经意写下这种代码: + ```rust use std::sync::{Mutex, MutexGuard}; @@ -159,6 +172,7 @@ async fn increment_and_do_stuff(mutex: &Mutex) { ``` 如果你要 `spawn` 一个任务来执行上面的函数的话,会报错: + ```console error: future cannot be sent between threads safely --> src/lib.rs:13:5 @@ -187,7 +201,9 @@ note: future is not `Send` as this value is used across an await 错误的原因在于 `std::sync::MutexGuard` 类型并没有实现 `Send` 特征,这意味着你不能将一个 `Mutex` 锁发送到另一个线程,因为 `.await` 可能会让任务转移到另一个线程上执行,这个之前也介绍过。 #### 提前释放锁 + 要解决这个问题,就必须重构代码,让 `Mutex` 锁在 `.await` 被调用前就被释放掉。 + ```rust // 下面的代码可以工作! async fn increment_and_do_stuff(mutex: &Mutex) { @@ -203,6 +219,7 @@ async fn increment_and_do_stuff(mutex: &Mutex) { > 大家可能已经发现,很多错误都是因为 `.await` 引起的,其实你只要记住,在 `.await` 执行期间,任务可能会在线程间转移,那么这些错误将变得很好理解,不必去死记硬背 但是下面的代码不工作: + ```rust use std::sync::{Mutex, MutexGuard}; @@ -222,7 +239,9 @@ async fn increment_and_do_stuff(mutex: &Mutex) { 再来看看其它解决方法: #### 重构代码:在 `.await` 期间不持有锁 + 之前的代码其实也是为了在 `.await` 期间不持有锁,但是我们还有更好的实现方式,例如,你可以把 `Mutex` 放入一个结构体中,并且只在该结构体的非异步方法中使用该锁: + ```rust use std::sync::Mutex; @@ -244,9 +263,11 @@ async fn increment_and_do_stuff(can_incr: &CanIncrement) { ``` #### 使用异步任务和通过消息传递来管理状态 + 该方法常常用于共享的资源是 I/O 类型的资源时,我们在下一章节将详细介绍。 #### 使用 Tokio 提供的异步锁 + Tokio 提供的锁最大的优点就是:它可以在 `.await` 执行期间被持有,而且不会有任何问题。但是代价就是,这种异步锁的性能开销会更高,因此如果可以,使用之前的两种方法来解决会更好。 ```rust diff --git a/src/tokio/spawning.md b/src/tokio/spawning.md index e6f43326..a48f5732 100644 --- a/src/tokio/spawning.md +++ b/src/tokio/spawning.md @@ -1,7 +1,9 @@ # 创建异步任务 + 同志们,抓稳了,我们即将换挡提速,通向 `mini-redis` 服务端的高速之路已经开启。 不过在开始之前,先来做点收尾工作:上一章节中,我们实现了一个简易的 `mini-redis` 客户端并支持了 `SET`/`GET` 操作, 现在将该[代码](https://course.rs/tokio/getting-startted.html#分析未到代码先行)移动到 `example` 文件夹下,因为我们这个章节要实现的是服务器,后面可以用之前客户端示例对我们的服务器端进行测试: + ```shell $ mkdir -p examples $ mv src/main.rs examples/hello-redis.rs @@ -10,6 +12,7 @@ $ mv src/main.rs examples/hello-redis.rs 然后再重新创建一个空的 `src/main.rs` 文件,至此换挡已经完成,提速正式开始。 ## 接收 sockets + 作为服务器端,最基础的工作无疑是接收外部进来的 TCP 连接,可以通过 `tokio::net::TcpListener` 来完成。 > Tokio 中大多数类型的名称都和标准库中对应的同步类型名称相同,而且,如果没有特殊原因,Tokio 的 API 名称也和标准库保持一致,只不过用 `async fn` 取代 `fn` 来声明函数。 @@ -49,11 +52,13 @@ async fn process(socket: TcpStream) { ``` 现在运行我们的简单服务器 : + ```shel cargo run ``` 此时服务器会处于循环等待以接收连接的状态,接下来在一个新的终端窗口中启动上一章节中的 `redis` 客户端,由于相关代码已经放入 `examples` 文件夹下,因此我们可以使用 `-- example` 来指定运行该客户端示例: + ```shell $ cargo run --example hello-redis ``` @@ -61,6 +66,7 @@ $ cargo run --example hello-redis 此时,客户端的输出是: `Error: "unimplemented"`, 同时服务器端打印出了客户端发来的由 **redis 命令和数据** 组成的数据帧: `GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])`。 ## 生成任务 + 上面的服务器,如果你仔细看,它其实一次只能接受和处理一条 TCP 连接,只有等当前的处理完并结束后,才能开始接收下一条连接。原因在于 `loop` 循环中的 `await` 会导致当前任务进入阻塞等待,也就是 `loop` 循环会被阻塞。 而这显然不是我们想要的,服务器能并发地处理多条连接的请求,才是正确的打开姿势,下面来看看如何实现真正的并发。 @@ -68,6 +74,7 @@ $ cargo run --example hello-redis > 关于并发和并行,在[多线程章节中](https://course.rs/advance/concurrency-with-threads/concurrency-parallelism.html)有详细的解释 为了并发的处理连接,需要为每一条进来的连接都生成( `spawn` )一个新的任务, 然后在该任务中处理连接: + ```rust use tokio::net::TcpListener; @@ -87,9 +94,11 @@ async fn main() { ``` #### 任务 + 一个 Tokio 任务是一个异步的绿色线程,它们通过 `tokio::spawn` 进行创建,该函数会返回一个 `JoinHandle` 类型的句柄,调用者可以使用该句柄跟创建的任务进行交互。 `spawn` 函数的参数是一个 `async` 语句块,该语句块甚至可以返回一个值,然后调用者可以通过 `JoinHandle` 句柄获取该值: + ```rust #[tokio::main] async fn main() { @@ -106,10 +115,12 @@ async fn main() { 任务是调度器管理的执行单元。`spawn`生成的任务会首先提交给调度器,然后由它负责调度执行。需要注意的是,执行任务的线程未必是创建任务的线程,任务完全有可能运行在另一个不同的线程上,而且任务在生成后,它还可能会在线程间被移动。 -任务在 Tokio 中远比看上去要更轻量,例如创建一个任务仅仅需要一次64字节大小的内存分配。因此应用程序在生成任务上,完全不应该有任何心理负担,除非你在一台没那么好的机器上疯狂生成了几百万个任务。。。 +任务在 Tokio 中远比看上去要更轻量,例如创建一个任务仅仅需要一次 64 字节大小的内存分配。因此应用程序在生成任务上,完全不应该有任何心理负担,除非你在一台没那么好的机器上疯狂生成了几百万个任务。。。 #### `'static` 约束 + 当使用 Tokio 创建一个任务时,该任务类型的生命周期必须是 `'static`。意味着,在任务中不能使用外部数据的引用: + ```rust use tokio::task; @@ -124,6 +135,7 @@ async fn main() { ``` 上面代码中,`spawn` 出的任务引用了外部环境中的变量 `v` ,导致以下报错: + ```console error[E0373]: async block may outlive the current function, but it borrows `v`, which is owned by the current function @@ -155,22 +167,24 @@ help: to force the async block to take ownership of `v` (and any other 原因在于:默认情况下,变量并不是通过 `move` 的方式转移进 `async` 语句块的, `v` 变量的所有权依然属于 `main` 函数,因为任务内部的 `println!` 是通过借用的方式使用了 `v`,但是这种借用并不能满足 `'static` 生命周期的要求。 -在报错的同时,Rust编译器还给出了相当有帮助的提示:为 `async` 语句块使用 `move` 关键字,这样就能将 `v` 的所有权从 `main` 函数转移到新创建的任务中。 +在报错的同时,Rust 编译器还给出了相当有帮助的提示:为 `async` 语句块使用 `move` 关键字,这样就能将 `v` 的所有权从 `main` 函数转移到新创建的任务中。 但是 `move` 有一个问题,一个数据只能被一个任务使用,如果想要多个任务使用一个数据,就有些强人所难。不知道还有多少同学记得 [`Arc`](../advance/smart-pointer/rc-arc.md),它可以轻松解决该问题,还是线程安全的。 -在上面的报错中,还有一句很奇怪的信息`function requires argument type to outlive 'static`, 函数要求参数类型的生命周期必须比 `'static` 长,问题是 `'static` 已经活得跟整个程序一样久了,难道函数的参数还能活得更久?大家可能会觉得编译器秀逗了,毕竟其它语言编译器也有秀逗的时候:) +在上面的报错中,还有一句很奇怪的信息`function requires argument type to outlive 'static`, 函数要求参数类型的生命周期必须比 `'static` 长,问题是 `'static` 已经活得跟整个程序一样久了,难道函数的参数还能活得更久?大家可能会觉得编译器秀逗了,毕竟其它语言编译器也有秀逗的时候:) 先别急着给它扣帽子,虽然我有时候也想这么做。。原因是它说的是类型必须活得比 `'static` 长,而不是值。当我们说一个值是 `'static` 时,意味着它将永远存活。这个很重要,因为编译器无法知道新创建的任务将存活多久,所以唯一的办法就是让任务永远存活。 如果大家对于 `'&static` 和 `T: 'static` 较为模糊,强烈建议回顾下[该章节](https://course.rs/advance/lifetime/static.html)。 #### Send 约束 + `tokio::spawn` 生成的任务必须实现 `Send` 特征,因为当这些任务在 `.await` 执行过程中发生阻塞时,Tokio 调度器会将任务在线程间移动。 **一个任务要实现 `Send` 特征,那它在 `.await` 调用的过程中所持有的全部数据都必须实现 `Send` 特征**。当 `.await` 调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。该流程能正确的工作,任务必须将`.await`之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。若这些状态实现了 `Send` 特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动。 例如以下代码可以工作: + ```rust use tokio::task::yield_now; use std::rc::Rc; @@ -192,6 +206,7 @@ async fn main() { ``` 但是下面代码就不行: + ```rust use tokio::task::yield_now; use std::rc::Rc; @@ -218,13 +233,14 @@ async fn main() { 这里有一个很重要的点,代码注释里有讲到,但是我们再重复一次: `rc` 是否会保存到任务状态中,取决于 `.await` 的调用是否处于它的作用域中,上面代码中,就算你注释掉 `println!` 函数,该报错依然会报错,因为 `rc` 的作用域直到 `async` 的末尾才结束! 下面是相应的报错,在下一章节,我们还会继续深入讨论该错误: + ```shell error: future cannot be sent between threads safely --> src/main.rs:6:5 | 6 | tokio::spawn(async { | ^^^^^^^^^^^^ future created by async block is not `Send` - | + | ::: [..]spawn.rs:127:21 | 127 | T: Future + Send + 'static, @@ -248,10 +264,12 @@ note: future is not `Send` as this value is used across an await | - `rc` is later dropped here ``` -## 使用HashMap存储数据 +## 使用 HashMap 存储数据 + 现在,我们可以继续前进了,下面来实现 `process` 函数,它用于处理进入的命令。相应的值将被存储在 `HashMap` 中: 通过 `SET` 命令存值,通过 `GET` 命令来取值。 同时,我们将使用循环的方式在同一个客户端连接中处理多次连续的请求: + ```rust use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; diff --git a/src/tokio/stream.md b/src/tokio/stream.md index 9343e003..134db44a 100644 --- a/src/tokio/stream.md +++ b/src/tokio/stream.md @@ -1,9 +1,11 @@ # Stream + 大家有没有想过, Rust 中的迭代器在迭代时能否异步进行?若不可以,是不是有相应的解决方案? -以上的问题其实很重要,因为在实际场景中,迭代一个集合,然后异步的去执行是很常见的需求,好在 Tokio 为我们提供了 `stream`,我们可以在异步函数中对其进行迭代,甚至和迭代器 `Iterator` 一样,`stream` 还能使用适配器,例如 `map` ! Tokio 在 [`StreamExt`](https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html) 特征上定义了常用的适配器。 +以上的问题其实很重要,因为在实际场景中,迭代一个集合,然后异步的去执行是很常见的需求,好在 Tokio 为我们提供了 `stream`,我们可以在异步函数中对其进行迭代,甚至和迭代器 `Iterator` 一样,`stream` 还能使用适配器,例如 `map` ! Tokio 在 [`StreamExt`](https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html) 特征上定义了常用的适配器。 要使用 `stream` ,目前还需要手动引入对应的包: + ```rust tokio-stream = "0.1" ``` @@ -11,14 +13,16 @@ tokio-stream = "0.1" > stream 没有放在 `tokio` 包的原因在于标准库中的 `Stream` 特征还没有稳定,一旦稳定后,`stream` 将移动到 `tokio` 中来 ## 迭代 + 目前, Rust 语言还不支持异步的 `for` 循环,因此我们需要 `while let` 循环和 [`StreamExt::next()`](https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html#method.next) 一起使用来实现迭代的目的: + ```rust use tokio_stream::StreamExt; #[tokio::main] async fn main() { let mut stream = tokio_stream::iter(&[1, 2, 3]); - + while let Some(v) = stream.next().await { println!("GOT = {:?}", v); } @@ -28,9 +32,11 @@ async fn main() { 和迭代器 `Iterator` 类似,`next()` 方法返回一个 `Option`,其中 `T` 是从 `stream` 中获取的值的类型。若收到 `None` 则意味着 `stream` 迭代已经结束。 #### mini-redis 广播 + 下面我们来实现一个复杂一些的 mini-redis 客户端,完整代码见[这里](https://github.com/tokio-rs/website/blob/master/tutorial-code/streams/src/main.rs)。 在开始之前,首先启动一下完整的 mini-redis 服务器端: + ```console $ mini-redis-server ``` @@ -92,6 +98,7 @@ async fn main() -> mini_redis::Result<()> { 大家可以去掉 `pin!` 的调用,然后观察下报错,若以后你遇到这种错误,可以尝试使用下 `pin!`。 此时,可以运行下我们的客户端代码看看效果(别忘了先启动前面提到的 mini-redis 服务端): + ```console got = Ok(Message { channel: "numbers", content: b"1" }) got = Ok(Message { channel: "numbers", content: b"two" }) @@ -104,6 +111,7 @@ got = Ok(Message { channel: "numbers", content: b"6" }) 在了解了 `stream` 的基本用法后,我们再来看看如何使用适配器来扩展它。 ## 适配器 + 在前面章节中,我们了解了迭代器有[两种适配器](https://course.rs/advance/functional-programing/iterator.html#消费者与适配器): - 迭代器适配器,会将一个迭代器转变成另一个迭代器,例如 `map`,`filter` 等 @@ -112,6 +120,7 @@ got = Ok(Message { channel: "numbers", content: b"6" }) 与迭代器类似,`stream` 也有适配器,例如一个 `stream` 适配器可以将一个 `stream` 转变成另一个 `stream` ,例如 `map`、`take` 和 `filter`。 在之前的客户端中,`subscribe` 订阅一直持续下去,直到程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。 + ```rust let messages = subscriber .into_stream() @@ -119,6 +128,7 @@ let messages = subscriber ``` 这里关键就在于 `take` 适配器,它会限制 `stream` 只能生成最多 `n` 条消息。运行下看看结果: + ```console got = Ok(Message { channel: "numbers", content: b"1" }) got = Ok(Message { channel: "numbers", content: b"two" }) @@ -126,6 +136,7 @@ got = Ok(Message { channel: "numbers", content: b"3" }) ``` 程序终于可以正常结束了。现在,让我们过滤 `stream` 中的消息,只保留数字类型的值: + ```rust let messages = subscriber .into_stream() @@ -137,6 +148,7 @@ let messages = subscriber ``` 运行后输出: + ```console got = Ok(Message { channel: "numbers", content: b"1" }) got = Ok(Message { channel: "numbers", content: b"3" }) @@ -146,6 +158,7 @@ got = Ok(Message { channel: "numbers", content: b"6" }) 需要注意的是,适配器的顺序非常重要,`.filter(...).take(3)` 和 `.take(3).filter(...)` 的结果可能大相径庭,大家可以自己尝试下。 现在,还有一件事要做,咱们的消息被不太好看的 `Ok(...)` 所包裹,现在通过 `map` 适配器来简化下: + ```rust let messages = subscriber .into_stream() @@ -170,7 +183,9 @@ got = b"6" 想要学习更多的适配器,可以看看 [`StreamExt`](https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html) 特征。 ## 实现 Stream 特征 + 如果大家还没忘记 `Future` 特征,那 `Stream` 特征相信你也会很快记住,因为它们非常类似: + ```rust use std::pin::Pin; use std::task::{Context, Poll}; @@ -179,7 +194,7 @@ pub trait Stream { type Item; fn poll_next( - self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll>; @@ -192,6 +207,7 @@ pub trait Stream { `Stream::poll_next()` 函数跟 `Future::poll` 很相似,区别就是前者为了从 `stream` 收到多个值需要重复的进行调用。 就像在 [`深入async`](https://course.rs/tokio/async.html) 章节提到的那样,当一个 `stream` 没有做好返回一个值的准备时,它将返回一个 `Poll::Pending` ,同时将任务的 `waker` 进行注册。一旦 `stream` 准备好后, `waker` 将被调用。 通常来说,如果想要手动实现一个 `Stream`,需要组合 `Future` 和其它 `Stream`。下面,还记得在[`深入async`](https://course.rs/tokio/async.html) 中构建的 `Delay Future` 吗?现在让我们来更进一步,将它转换成一个 `stream`,每 10 毫秒生成一个值,总共生成 3 次: + ```rust use tokio_stream::Stream; use std::pin::Pin; @@ -228,9 +244,11 @@ impl Stream for Interval { ``` #### async-stream + 手动实现 `Stream` 特征实际上是相当麻烦的事,不幸地是,Rust 语言的 `async/await` 语法目前还不能用于定义 `stream`,虽然相关的工作已经在进行中。 作为替代方案,[`async-stream`](https://docs.rs/async-stream/latest/async_stream/) 包提供了一个 `stream!` 宏,它可以将一个输入转换成 `stream`,使用这个包,上面的代码可以这样实现: + ```rust use async_stream::stream; use std::time::{Duration, Instant}; @@ -247,6 +265,3 @@ stream! { ``` 嗯,看上去还是相当不错的,代码可读性大幅提升! - - -