Merge pull request #376 from 1132719438/main

Some fixes in async and tokio chapter
pull/385/head
Sunface 3 years ago committed by GitHub
commit 6c72394c16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,12 +1,12 @@
# async/await 和 Stream流处理 # async/await 和 Stream流处理
在入门章节中,我们简单学习了该如何使用 `async/.await` 同时在后面也了解了一些底层原理,现在是时候继续继续深入了。 在入门章节中,我们简单学习了该如何使用 `async/.await` 同时在后面也了解了一些底层原理,现在是时候继续深入了。
`async/.await`是 Rust 语法的一部分,它在遇到阻塞操作时( 例如IO )会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。 `async/.await`是 Rust 语法的一部分,它在遇到阻塞操作时( 例如IO )会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
有两种方式可以使用`async` `async fn`用于声明函数,`async { ... }`用于声明语句块,它们会返回一个实现 `Future` 特征的值: 有两种方式可以使用`async` `async fn`用于声明函数,`async { ... }`用于声明语句块,它们会返回一个实现 `Future` 特征的值:
```rust ```rust
// `foo()`返回一个`Future<Output = u8>`, // `foo()`返回一个`Future<Output = u8>`,
// 当调用`foo().wait`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值 // 当调用`foo().await`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值
async fn foo() -> u8 { 5 } async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> { fn bar() -> impl Future<Output = u8> {
@ -124,7 +124,7 @@ fn move_block() -> impl Future<Output = ()> {
## Stream流处理 ## Stream流处理
`Stream` 特征类似于 `Future` 特征,但是前者可以在完成前可以生成多个值,这种行为跟标准库中的 `Iterator` 特征倒是颇为相似。 `Stream` 特征类似于 `Future` 特征,但是前者在完成前可以生成多个值,这种行为跟标准库中的 `Iterator` 特征倒是颇为相似。
```rust ```rust
trait Stream { trait Stream {
// Stream生成的值的类型 // Stream生成的值的类型

@ -25,9 +25,9 @@ async fn enjoy_book_and_music() -> (Book, Music) {
} }
``` ```
看上去像模像样,嗯,在某些语言中也许可以,但是 Rust 不行。因为在某些语言中,`Future`一旦创建就开始运行,等到返回的时候,基本就可以同时结束并返回了。 但是 Rust 中的 `Future` 是惰性的,直到调用 `.await` 时,才会开始运行。而那两个 `await` 由于在代码中有先后顺序,因此它们是连续运行的。 看上去像模像样,嗯,在某些语言中也许可以,但是 Rust 不行。因为在某些语言中,`Future`一旦创建就开始运行,等到返回的时候,基本就可以同时结束并返回了。 但是 Rust 中的 `Future` 是惰性的,直到调用 `.await` 时,才会开始运行。而那两个 `await` 由于在代码中有先后顺序,因此它们是顺序运行的。
为了正确的并发两个 `Future` 我们来试试 `futures::join!` 宏: 为了正确的并发运行两个 `Future` 我们来试试 `futures::join!` 宏:
```rust ```rust
use futures::join; use futures::join;
@ -107,7 +107,7 @@ async fn race_tasks() {
#### default 和 complete #### default 和 complete
`select!`还支持 `default``complete` 分支: `select!`还支持 `default``complete` 分支:
- `complete` 分支当所有的 `Future``Stream` 完成后才会被执行,它往往配合`loop`使用,`loop`用于循环完成所有的 `Future` - `complete` 分支当所有的 `Future``Stream` 完成后才会被执行,它往往配合`loop`使用,`loop`用于循环完成所有的 `Future`
- `default`分支,若没有任何 `Future``Stream` 处于 `Read` 状态, 则该分支会被立即执行 - `default`分支,若没有任何 `Future``Stream` 处于 `Ready` 状态, 则该分支会被立即执行
```rust ```rust
use futures::future; use futures::future;
@ -147,7 +147,7 @@ pin_mut!(t1, t2);
首先,`.fuse()`方法可以让 `Future` 实现 `FusedFuture` 特征, 而 `pin_mut!` 宏会为 `Future` 实现 `Unpin`特征,这两个特征恰恰是使用 `select` 所必须的: 首先,`.fuse()`方法可以让 `Future` 实现 `FusedFuture` 特征, 而 `pin_mut!` 宏会为 `Future` 实现 `Unpin`特征,这两个特征恰恰是使用 `select` 所必须的:
- `Unpin`,由于 `select` 不会通过拿走所有权的方式使用`Future`,而是通过可变引用的方式去使用,这样当 `select` 结束后,该 `Future` 若没有被完成,它的所有权还可以继续被其它代码使用。 - `Unpin`,由于 `select` 不会通过拿走所有权的方式使用`Future`,而是通过可变引用的方式去使用,这样当 `select` 结束后,该 `Future` 若没有被完成,它的所有权还可以继续被其它代码使用。
- `FusedFuture`的原因跟上面类似,当 `Future` 一旦完成后,那 `select` 就不能再对其进行轮询使用。`Fuse`意味着融化的意思,相当于 `Future` 一旦完成,就融化了,无法再被使用 - `FusedFuture`的原因跟上面类似,当 `Future` 一旦完成后,那 `select` 就不能再对其进行轮询使用。`Fuse`意味着熔断,相当于 `Future` 一旦完成,再次调用`poll`会直接返回`Poll::Pending`
只有实现了`FusedFuture``select` 才能配合 `loop` 一起使用。假如没有实现,就算一个 `Future` 已经完成了,它依然会被 `select` 不停的轮询执行。 只有实现了`FusedFuture``select` 才能配合 `loop` 一起使用。假如没有实现,就算一个 `Future` 已经完成了,它依然会被 `select` 不停的轮询执行。

@ -119,10 +119,10 @@ pub struct Pin<P> {
因此,一个类型如果不能被移动,它必须实现 `!Unpin` 特征。如果大家对 `Pin``Unpin` 还是模模糊糊,建议再重复看一遍之前的内容,理解它们对于我们后面要讲到的内容非常重要! 因此,一个类型如果不能被移动,它必须实现 `!Unpin` 特征。如果大家对 `Pin``Unpin` 还是模模糊糊,建议再重复看一遍之前的内容,理解它们对于我们后面要讲到的内容非常重要!
如果将 `Unpin` 与之前章节学过的 [`Send/Sync`](https://www.zhihu.com/question/303273488/answer/2309266713) 进行下对比,会发现它们都很像: 如果将 `Unpin` 与之前章节学过的 [`Send/Sync`](https://course.rs/advance/concurrency-with-threads/send-sync.html) 进行下对比,会发现它们都很像:
- 都是标记特征( marker trait ),该特征未定义任何行为,非常适用于标记 - 都是标记特征( marker trait ),该特征未定义任何行为,非常适用于标记
- 都可以通过!语法去除实现 - 都可以通过`!`语法去除实现
- 绝大多数情况都是自动实现, 无需我们的操心 - 绝大多数情况都是自动实现, 无需我们的操心
@ -310,22 +310,57 @@ error[E0277]: `PhantomPinned` cannot be unpinned
> 需要注意的是固定在栈上非常依赖于你写出的 `unsafe` 代码的正确性。我们知道 `&'a mut T` 可以固定的生命周期是 `'a` ,但是我们却不知道当生命周期 `'a` 结束后,该指针指向的数据是否会被移走。如果你的 `unsafe` 代码里这么实现了,那么就会违背 `Pin` 应该具有的作用! > 需要注意的是固定在栈上非常依赖于你写出的 `unsafe` 代码的正确性。我们知道 `&'a mut T` 可以固定的生命周期是 `'a` ,但是我们却不知道当生命周期 `'a` 结束后,该指针指向的数据是否会被移走。如果你的 `unsafe` 代码里这么实现了,那么就会违背 `Pin` 应该具有的作用!
> >
> 一个常见的错误就是忘记去遮蔽(shadow )初始的变量,因为你可以 `drop``Pin` ,然后在 `&'a mut T` 结束后去移动数据: > 一个常见的错误就是忘记去遮蔽(shadow )初始的变量,因为你可以 `drop``Pin` ,然后在 `&'a mut T` 结束后去移动数据:
> ```rust
```rust > fn main() {
pub fn main() { > let mut test1 = Test::new("test1");
let mut test1 = Test::new("test1"); > let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
let mut test1 = unsafe { Pin::new_unchecked(&mut test1) }; > Test::init(test1_pin.as_mut());
Test::init(test1.as_mut()); >
> drop(test1_pin);
let mut test2 = Test::new("test2"); > println!(r#"test1.b points to "test1": {:?}..."#, test1.b);
let mut test2 = unsafe { Pin::new_unchecked(&mut test2) }; >
Test::init(test2.as_mut()); > let mut test2 = Test::new("test2");
> mem::swap(&mut test1, &mut test2);
println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref())); > println!("... and now it points nowhere: {:?}", test1.b);
std::mem::swap(test1.get_mut(), test2.get_mut()); > }
println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref())); > # use std::pin::Pin;
} > # use std::marker::PhantomPinned;
``` > # use std::mem;
> #
> # #[derive(Debug)]
> # struct Test {
> # a: String,
> # b: *const String,
> # _marker: PhantomPinned,
> # }
> #
> #
> # impl Test {
> # fn new(txt: &str) -> Self {
> # Test {
> # a: String::from(txt),
> # b: std::ptr::null(),
> # // This makes our type `!Unpin`
> # _marker: PhantomPinned,
> # }
> # }
> #
> # fn init<'a>(self: Pin<&'a mut Self>) {
> # let self_ptr: *const String = &self.a;
> # let this = unsafe { self.get_unchecked_mut() };
> # this.b = self_ptr;
> # }
> #
> # fn a<'a>(self: Pin<&'a Self>) -> &'a str {
> # &self.get_ref().a
> # }
> #
> # fn b<'a>(self: Pin<&'a Self>) -> &'a String {
> # assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
> # unsafe { &*(self.b) }
> # }
> # }
> ```
#### 固定到堆上 #### 固定到堆上
将一个 `!Unpin` 类型的值固定到堆上,会给予该值一个稳定的内存地址,它指向的堆中的值在 `Pin` 后是无法被移动的。而且与固定在栈上不同,我们知道堆上的值在整个生命周期内都会被稳稳地固定住。 将一个 `!Unpin` 类型的值固定到堆上,会给予该值一个稳定的内存地址,它指向的堆中的值在 `Pin` 后是无法被移动的。而且与固定在栈上不同,我们知道堆上的值在整个生命周期内都会被稳稳地固定住。
@ -412,7 +447,10 @@ execute_unpin_future(fut); // OK
- 若 `T: Unpin` ( Rust 类型的默认实现),那么 `Pin<'a, T>``&'a mut T` 完全相同,也就是 `Pin` 将没有任何效果, 该移动还是照常移动 - 若 `T: Unpin` ( Rust 类型的默认实现),那么 `Pin<'a, T>``&'a mut T` 完全相同,也就是 `Pin` 将没有任何效果, 该移动还是照常移动
- 绝大多数标准库类型都实现了 `Unpin` ,事实上,对于 Rust 中你能遇到的绝大多数类型,该结论依然成立 - 绝大多数标准库类型都实现了 `Unpin` ,事实上,对于 Rust 中你能遇到的绝大多数类型,该结论依然成立
,其中一个例外就是:`async/await` 生成的 `Future` 没有实现 `Unpin` ,其中一个例外就是:`async/await` 生成的 `Future` 没有实现 `Unpin`
- 你可以通过以下方法为自己的类型添加 `!Unpin` 约束1. 使用文中提到的 `std::marker::PhantomPinned` 2. 使用`nightly` 版本下的 `feature flag` - 你可以通过以下方法为自己的类型添加 `!Unpin` 约束:
- 使用文中提到的 `std::marker::PhantomPinned`
- 使用`nightly` 版本下的 `feature flag`
- 可以将值固定到栈上,也可以固定到堆上 - 可以将值固定到栈上,也可以固定到堆上
- 将 `!Unpin` 值固定到栈上需要使用 `unsafe` - 将 `!Unpin` 值固定到栈上需要使用 `unsafe`
- 将 `!Unpin` 值固定到堆上无需 `unsafe` ,可以通过 `Box::pin` 来简单的实现 - 将 `!Unpin` 值固定到堆上无需 `unsafe` ,可以通过 `Box::pin` 来简单的实现
- 当固定类型`T: !Unpin`时你需要保证数据从被固定到被drop这段时期内其内存不会变得非法或者被重用

@ -79,10 +79,10 @@ fn handle_connection(mut stream: TcpStream) {
运行以上代码,并从浏览器访问 `127.0.0.1:7878` 你将看到一条来自 `Ferris` 的问候。 运行以上代码,并从浏览器访问 `127.0.0.1:7878` 你将看到一条来自 `Ferris` 的问候。
在回忆了单线程版本该如何实现后,我们也将进入正题,一起来实现一个基于 `ascyn` 的异步Web服务器。 在回忆了单线程版本该如何实现后,我们也将进入正题,一起来实现一个基于 `async` 的异步Web服务器。
## 运行异步代码 ## 运行异步代码
一个 Web 服务器必须要能并发的处理大量来自用户的请求,也就是我们不能在处理上一个用户的请求后,再处理下一个用户的请求。上面的单线程版本可以修改为多线程甚至于线程池来实现并发处理,但是线程还是太重了,使用 `async` 实现 `Web` 服务器才是最适合的。 一个 Web 服务器必须要能并发的处理大量来自用户的请求,也就是我们不能在处理上一个用户的请求后,再处理下一个用户的请求。上面的单线程版本可以修改为多线程甚至于线程池来实现并发处理,但是线程还是太重了,使用 `async` 实现 `Web` 服务器才是最适合的。
首先将 `handle_connection` 修改为 `async` 实现: 首先将 `handle_connection` 修改为 `async` 实现:
```rust ```rust
@ -148,7 +148,7 @@ async fn handle_connection(mut stream: TcpStream) {
} }
``` ```
上面是全新实现的 `handle_connection` 它会在内部睡眠5秒模拟一次用户慢请求需要注意的是我们并没有使用 `std::thread::sleep` 进行睡眠, 原因是该函数并不适用于并发代码,它会让当前线程陷入睡眠中,导致其它任务无法继续运行!因此我们需要一个睡眠函数 `async_std::task::sleep`,它仅会让当前的任务陷入睡眠,然后该任务会让出线程的控制权,这样线程就可以继续运行其它任务。 上面是全新实现的 `handle_connection` 它会在内部睡眠5秒模拟一次用户慢请求需要注意的是我们并没有使用 `std::thread::sleep` 进行睡眠,原因是该函数是阻塞的,它会让当前线程陷入睡眠中,导致其它任务无法继续运行!因此我们需要一个睡眠函数 `async_std::task::sleep`,它仅会让当前的任务陷入睡眠,然后该任务会让出线程的控制权,这样线程就可以继续运行其它任务。
因此,光把函数变成 `async` 往往是不够的,还需要将它内部的代码也都变成异步兼容的,阻塞线程绝对是不可行的。 因此,光把函数变成 `async` 往往是不够的,还需要将它内部的代码也都变成异步兼容的,阻塞线程绝对是不可行的。
@ -222,7 +222,7 @@ async fn main() {
## 测试 handle_connection 函数 ## 测试 handle_connection 函数
对于测试 Web 服务器,使用集成测试往往是最简单的,但是在本例子中,将使用单元测试来测试连接处理函数的正确性。 对于测试 Web 服务器,使用集成测试往往是最简单的,但是在本例子中,将使用单元测试来测试连接处理函数的正确性。
为了保证单元测试的隔离性和确定性,我们使用 `Mock` 来替代 `TcpStream` 。首先,修改 `handle_connection` 的函数签名让测试更简单,之所以可以修改签名,原因在于 `async_std::net::TcpStream` 实际上并不是必须的,只要任何结构体实现了 `async_std::io::Read`, `async_std::io::Write``marker::Unpin` 就可以替代它: 为了保证单元测试的隔离性和确定性,我们使用 `MockTcpStream` 来替代 `TcpStream` 。首先,修改 `handle_connection` 的函数签名让测试更简单,之所以可以修改签名,原因在于 `async_std::net::TcpStream` 实际上并不是必须的,只要任何结构体实现了 `async_std::io::Read`, `async_std::io::Write``marker::Unpin` 就可以替代它:
```rust ```rust
use std::marker::Unpin; use std::marker::Unpin;
use async_std::io::{Read, Write}; use async_std::io::{Read, Write};

@ -1,7 +1,7 @@
# 深入 Tokio 背后的异步原理 # 深入 Tokio 背后的异步原理
在经过多个章节的深入学习后Tokio 对我们来说不再是一座隐于云雾中的高山,它其实蛮简单好用的,甚至还有一丝丝的可爱!? 在经过多个章节的深入学习后Tokio 对我们来说不再是一座隐于云雾中的高山,它其实蛮简单好用的,甚至还有一丝丝的可爱!?
但从现在开始,如果想要进一步的深入 Tokio ,首先需要深入理解 async 的原理,其实我们在[之前的章节](https://course.rs/async/intro.html)已经深入学习过,这里结合 Tokio 再来回顾下。 但从现在开始,如果想要进一步的深入 Tokio ,首先需要深入理解 `async` 的原理,其实我们在[之前的章节](https://course.rs/async/intro.html)已经深入学习过,这里结合 Tokio 再来回顾下。
## Future ## Future
先来回顾一下 `async fn` 异步函数 : 先来回顾一下 `async fn` 异步函数 :
@ -118,7 +118,7 @@ use std::time::{Duration, Instant};
enum MainFuture { enum MainFuture {
// 初始化,但永远不会被 poll // 初始化,但永远不会被 poll
State0, State0,
// 等待 `Delay` 运行,例如 `future.awai` 代码行 // 等待 `Delay` 运行,例如 `future.await` 代码行
State1(Delay), State1(Delay),
// Future 执行完成 // Future 执行完成
Terminated, Terminated,
@ -236,12 +236,12 @@ impl MiniTokio {
鉴于此,我们的 mini-tokio 只应该在 `Future` 准备好可以进一步运行后,才去 `poll` 它,例如该 `Future` 之前阻塞等待的**资源**已经准备好并可以被使用了,就可以对其进行 `poll`。再比如,如果一个 `Future` 任务在阻塞等待从 TCP socket 中读取数据,那我们只想在 `socket` 中有数据可以读取后才去 `poll` 它,而不是没事就 `poll` 着玩。 鉴于此,我们的 mini-tokio 只应该在 `Future` 准备好可以进一步运行后,才去 `poll` 它,例如该 `Future` 之前阻塞等待的**资源**已经准备好并可以被使用了,就可以对其进行 `poll`。再比如,如果一个 `Future` 任务在阻塞等待从 TCP socket 中读取数据,那我们只想在 `socket` 中有数据可以读取后才去 `poll` 它,而不是没事就 `poll` 着玩。
回到在上面的代码中mini-tokio 只应该当任务的延迟时间到了后,才去 `poll` 它。 为了实现这个功能,我们需要通知 -> 运行机制:当任务可以进一步被推进运行时,它会主动通知执行器,然后执行器再来 `poll` 回到在上面的代码中mini-tokio 只应该当任务的延迟时间到了后,才去 `poll` 它。 为了实现这个功能,我们需要 `通知 -> 运行` 机制:当任务可以进一步被推进运行时,它会主动通知执行器,然后执行器再来 `poll`
## Waker ## Waker
一切的答案都在 `Waker` 中,资源可以用它来通知正在等待的任务:该资源已经准备好,可以继续运行了。 一切的答案都在 `Waker` 中,资源可以用它来通知正在等待的任务:该资源已经准备好,可以继续运行了。
再来看下 `Future::pol` 的定义: 再来看下 `Future::poll` 的定义:
```rust ```rust
fn poll(self: Pin<&mut Self>, cx: &mut Context) fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>; -> Poll<Self::Output>;
@ -327,7 +327,7 @@ impl Future for Delay {
在返回 `Poll::Pending` 之前,先调用了 `cx.waker().wake_by_ref()` ,由于此时我们还没有模拟计时资源,因此这里直接调用了 `wake` 进行通知,这样做会导致当前的 `Future` 被立即再次调度执行。 在返回 `Poll::Pending` 之前,先调用了 `cx.waker().wake_by_ref()` ,由于此时我们还没有模拟计时资源,因此这里直接调用了 `wake` 进行通知,这样做会导致当前的 `Future` 被立即再次调度执行。
由此可见,这种通知的控制权是在你手里的,甚至可以像上面代码这样,还没准备好资源,就直接进行 `wake` 通知,但是总归意义不大,而且浪费了 CPU因为这种执行 -> 立即通知再调度 -> 执行的方式会造成一个非常繁忙的循环。 由此可见,这种通知的控制权是在你手里的,甚至可以像上面代码这样,还没准备好资源,就直接进行 `wake` 通知,但是总归意义不大,而且浪费了 CPU因为这种 `执行 -> 立即通知再调度 -> 执行` 的方式会造成一个非常繁忙的循环。
#### 处理 wake 通知 #### 处理 wake 通知
下面,让我们更新 mint-tokio 服务,让它能接受 wake 通知:当 `waker.wake()` 被调用后,相关联的任务会被放入执行器的队列中,然后等待执行器的调用执行。 下面,让我们更新 mint-tokio 服务,让它能接受 wake 通知:当 `waker.wake()` 被调用后,相关联的任务会被放入执行器的队列中,然后等待执行器的调用执行。
@ -423,7 +423,7 @@ impl MiniTokio {
impl Task { impl Task {
fn poll(self: Arc<Self>) { fn poll(self: Arc<Self>) {
// 基于 Task 实例创建一个 waker, 它使用了之前的 `AarcWake` // 基于 Task 实例创建一个 waker, 它使用了之前的 `ArcWake`
let waker = task::waker(self.clone()); let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker); let mut cx = Context::from_waker(&waker);
@ -436,7 +436,7 @@ impl Task {
// 使用给定的 future 来生成新的任务 // 使用给定的 future 来生成新的任务
// //
// 新的任务会被推到 `sender` 中,接着该消息通道的接收就可以获取该任务,然后执行 // 新的任务会被推到 `sender` 中,接着该消息通道的接收就可以获取该任务,然后执行
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>) fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where where
F: Future<Output = ()> + Send + 'static, F: Future<Output = ()> + Send + 'static,
@ -492,14 +492,14 @@ async fn main() {
首先,`poll_fn` 函数使用闭包创建了一个 `Future`,其次,上面代码还创建一个 `Delay` 实例,然后在闭包中,对其进行了一次 `poll` ,接着再将该 `Delay` 实例发送到一个新的任务,在此任务中使用 `.await` 进行了执行。 首先,`poll_fn` 函数使用闭包创建了一个 `Future`,其次,上面代码还创建一个 `Delay` 实例,然后在闭包中,对其进行了一次 `poll` ,接着再将该 `Delay` 实例发送到一个新的任务,在此任务中使用 `.await` 进行了执行。
在例子中,`Delay:poll` 被调用了不止一次,且使用了不同的 `Waker` 实例,在这种场景下,你必须确保调用最近一次 `poll` 函数中的 `Waker` 参数中的`wake`方法。也就是调用最内层 `poll` 函数参数( `Waker` )上的 `wake` 方。 在例子中,`Delay:poll` 被调用了不止一次,且使用了不同的 `Waker` 实例,在这种场景下,你必须确保调用最近一次 `poll` 函数中的 `Waker` 参数中的`wake`方法。也就是调用最内层 `poll` 函数参数( `Waker` )上的 `wake`
当实现一个 `Future` 时,很关键的一点就是要假设每次 `poll` 调用都会应用到一个不同的 `Waker` 实例上。因此 `poll` 函数必须要使用一个新的 `waker` 去更新替代之前的 `waker` 当实现一个 `Future` 时,很关键的一点就是要假设每次 `poll` 调用都会应用到一个不同的 `Waker` 实例上。因此 `poll` 函数必须要使用一个新的 `waker` 去更新替代之前的 `waker`
我们之前的 `Delay` 实现中,会在每一次 `poll` 调用时都生成一个新的线程。这么做问题不大,但是当 `poll` 调用较多时会出现明显的性能问题!一个解决方法就是记录你是否已经生成了一个线程,然后只有在没有生成时才去创建一个新的线程。但是一旦这么做,就必须确保线程的 `Waker` 在后续 `poll` 调用中被正确更新,否则你无法唤醒最近的 `Waker` 我们之前的 `Delay` 实现中,会在每一次 `poll` 调用时都生成一个新的线程。这么做问题不大,但是当 `poll` 调用较多时会出现明显的性能问题!一个解决方法就是记录你是否已经生成了一个线程,然后只有在没有生成时才去创建一个新的线程。但是一旦这么做,就必须确保线程的 `Waker` 在后续 `poll` 调用中被正确更新,否则你无法唤醒最近的 `Waker`
这一段大家可能会看得云里雾里的,没办法,原文就饶绕去,好在终于可以看代码了。。我们可以通过代码来解决疑惑: 这一段大家可能会看得云里雾里的,没办法,原文就饶绕去,好在终于可以看代码了。。我们可以通过代码来解决疑惑:
```rust ```rust
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
@ -525,7 +525,8 @@ impl Future for Delay {
let mut waker = waker.lock().unwrap(); let mut waker = waker.lock().unwrap();
// 检查之前存储的 `waker` 是否跟当前任务的 `waker` 相匹配. // 检查之前存储的 `waker` 是否跟当前任务的 `waker` 相匹配.
// 这是必要的,原因是 `Delay Future` 的实例可能会在两次 `poll` 之间被转移到另一个任务中,然后 // 存储中的 waker 被该任务进行了更新。 // 这是必要的,原因是 `Delay Future` 的实例可能会在两次 `poll` 之间被转移到另一个任务中,然后
// 存储的 waker 被该任务进行了更新。
// 这种情况一旦发生,`Context` 包含的 `waker` 将不同于存储的 `waker` // 这种情况一旦发生,`Context` 包含的 `waker` 将不同于存储的 `waker`
// 因此我们必须对存储的 `waker` 进行更新 // 因此我们必须对存储的 `waker` 进行更新
if !waker.will_wake(cx.waker()) { if !waker.will_wake(cx.waker()) {
@ -550,7 +551,7 @@ impl Future for Delay {
}); });
} }
// 一旦 waker 被存储且计时器线程已经开始,我们就需要检查 `delay` 是否已完成 // 一旦 waker 被存储且计时器线程已经开始,我们就需要检查 `delay` 是否已完成
// 若计时已完成,则当前 Future 就可以完成并返回 `Poll::Ready` // 若计时已完成,则当前 Future 就可以完成并返回 `Poll::Ready`
if Instant::now() >= self.when { if Instant::now() >= self.when {
Poll::Ready(()) Poll::Ready(())
@ -603,7 +604,7 @@ async fn delay(dur: Duration) {
## 总结 ## 总结
在看完这么长的文章后,我们来总结下,否则大家可能还会遗忘: 在看完这么长的文章后,我们来总结下,否则大家可能还会遗忘:
- 在 Rust 中,`ascyn` 是惰性的,直到执行器 `poll` 它们时,才会开始执行 - 在 Rust 中,`async` 是惰性的,直到执行器 `poll` 它们时,才会开始执行
- `Waker``Future` 被执行的关键,它可以链接起 `Future` 任务和执行器 - `Waker``Future` 被执行的关键,它可以链接起 `Future` 任务和执行器
- 当资源没有准备时,会返回一个 `Poll::Pending` - 当资源没有准备时,会返回一个 `Poll::Pending`
- 当资源准备好时,会通过 `waker.wake` 发出通知 - 当资源准备好时,会通过 `waker.wake` 发出通知

@ -49,10 +49,10 @@ async fn main() {
在上一节中,我们介绍了几个解决方法,但是它们大部分都不太适用于此时的情况,例如: 在上一节中,我们介绍了几个解决方法,但是它们大部分都不太适用于此时的情况,例如:
- `std::sync::Mutex` 无法被使用,这个问题在之前章节有详解介绍过,同步锁无法`.await` 调用过程中使用 - `std::sync::Mutex` 无法被使用,这个问题在之前章节有详解介绍过,同步锁无法跨越 `.await` 调用时使用
- 那么你可能会想,是不是可以使用 `tokio::sync:Mutex` ,答案是可以用,但是同时就只能运行一个请求。若客户端实现了 redis 的 [pipelining](https://redis.io/topics/pipelining), 那这个异步锁就会导致连接利用率不足 - 那么你可能会想,是不是可以使用 `tokio::sync:Mutex` ,答案是可以用,但是同时就只能运行一个请求。若客户端实现了 redis 的 [pipelining](https://redis.io/topics/pipelining), 那这个异步锁就会导致连接利用率不足
这个不行,那个也不行,是不是没有办法解决了?还记得我们上一章节提到过几次的消息传递,但是一直没有看到它的庐山真面吗?现在可以来看看了。 这个不行,那个也不行,是不是没有办法解决了?还记得我们上一章节提到过几次的消息传递,但是一直没有看到它的庐山真面吗?现在可以来看看了。
## 消息传递 ## 消息传递
之前章节我们提到可以创建一个专门的任务 `C1` (消费者 Consumer) 和通过消息传递来管理共享的资源,这里的共享资源就是 `client` 。若任务 `P1` (生产者 Producer) 想要发出 Redis 请求,首先需要发送信息给 `C1`,然后 `C1` 会发出请求给服务器,在获取到结果后,再将结果返回给 `P1` 之前章节我们提到可以创建一个专门的任务 `C1` (消费者 Consumer) 和通过消息传递来管理共享的资源,这里的共享资源就是 `client` 。若任务 `P1` (生产者 Producer) 想要发出 Redis 请求,首先需要发送信息给 `C1`,然后 `C1` 会发出请求给服务器,在获取到结果后,再将结果返回给 `P1`
@ -108,7 +108,7 @@ async fn main() {
一个任务可以通过此通道将命令发送给管理 redis 连接的任务,同时由于通道支持多个生产者,因此多个任务可以同时发送命令。创建该通道会返回一个发送和接收句柄,这两个句柄可以分别被使用,例如它们可以被移动到不同的任务中。 一个任务可以通过此通道将命令发送给管理 redis 连接的任务,同时由于通道支持多个生产者,因此多个任务可以同时发送命令。创建该通道会返回一个发送和接收句柄,这两个句柄可以分别被使用,例如它们可以被移动到不同的任务中。
通道的缓冲队列长度是 32意味着如果消息发送的比接收的快这些消息将被存储在缓冲队列中一旦存满了 32 条消息,使用`send(...).await`的发送者会**进入睡眠**,直到缓冲队列可以放入信息的消息(被接收者消费了)。 通道的缓冲队列长度是 32意味着如果消息发送的比接收的快这些消息将被存储在缓冲队列中一旦存满了 32 条消息,使用`send(...).await`的发送者会**进入睡眠**,直到缓冲队列可以放入的消息(被接收者消费了)。
```rust ```rust
use tokio::sync::mpsc; use tokio::sync::mpsc;

@ -64,7 +64,7 @@ impl Connection {
关于Redis协议的说明可以看看[官方文档](https://redis.io/topics/protocol)`Connection` 代码的完整实现见[这里](https://github.com/tokio-rs/mini-redis/blob/tutorial/src/connection.rs). 关于Redis协议的说明可以看看[官方文档](https://redis.io/topics/protocol)`Connection` 代码的完整实现见[这里](https://github.com/tokio-rs/mini-redis/blob/tutorial/src/connection.rs).
## 缓冲读取(Buffered Read) ## 缓冲读取(Buffered Reads)
`read_frame` 方法会等到一个完整的帧都读取完毕后才返回,与之相比,它底层调用的`TcpStream::read` 只会返回任意多的数据(填满传入的缓冲区 buffer ),它可能返回帧的一部分、一个帧、多个帧,总之这种读取行为是不确定的。 `read_frame` 方法会等到一个完整的帧都读取完毕后才返回,与之相比,它底层调用的`TcpStream::read` 只会返回任意多的数据(填满传入的缓冲区 buffer ),它可能返回帧的一部分、一个帧、多个帧,总之这种读取行为是不确定的。
`read_frame` 的底层调用 `TcpStream::read` 读取到部分帧时,会将数据先缓冲起来,接着继续等待并读取数据。如果读到多个帧,那第一个帧会被返回,然后剩下的数据依然被缓冲起来,等待下一次 `read_frame` 被调用。 `read_frame` 的底层调用 `TcpStream::read` 读取到部分帧时,会将数据先缓冲起来,接着继续等待并读取数据。如果读到多个帧,那第一个帧会被返回,然后剩下的数据依然被缓冲起来,等待下一次 `read_frame` 被调用。
@ -194,7 +194,7 @@ pub async fn read_frame(&mut self)
一旦缓冲区满了,还需要增加缓冲区的长度,这样才能继续写入数据。还有一点值得注意,在 `parse_frame` 方法的内部实现中,也需要通过游标来解析数据: `self.buffer[..self.cursor]`,通过这种方式,我们可以准确获取到目前已经读取的全部数据。 一旦缓冲区满了,还需要增加缓冲区的长度,这样才能继续写入数据。还有一点值得注意,在 `parse_frame` 方法的内部实现中,也需要通过游标来解析数据: `self.buffer[..self.cursor]`,通过这种方式,我们可以准确获取到目前已经读取的全部数据。
在网络编程中,通过字节数组和游标的方式读取数据是非常普遍的,因此 `bytes` 包提供了一个 `Buf` 特征,如果一个类型可以被读取数据,那么该类型需要实现 `Buf` 特征。与之对应,当一个类型可以被写入数据时,它需要实现 `ButMut` 。 在网络编程中,通过字节数组和游标的方式读取数据是非常普遍的,因此 `bytes` 包提供了一个 `Buf` 特征,如果一个类型可以被读取数据,那么该类型需要实现 `Buf` 特征。与之对应,当一个类型可以被写入数据时,它需要实现 `BufMut` 。
`T: BufMut` ( 特征约束,说明类型 `T` 实现了 `BufMut` 特征 ) 被传给 `read_buf()` 方法时,缓冲区 `T` 的内部游标会自动进行更新。正因为如此,在使用了 `BufMut` 版本的 `read_frame` 中,我们并不需要管理自己的游标。 `T: BufMut` ( 特征约束,说明类型 `T` 实现了 `BufMut` 特征 ) 被传给 `read_buf()` 方法时,缓冲区 `T` 的内部游标会自动进行更新。正因为如此,在使用了 `BufMut` 版本的 `read_frame` 中,我们并不需要管理自己的游标。
@ -255,7 +255,7 @@ fn parse_frame(&mut self)
为了降低系统调用的次数,我们需要使用一个写入缓冲区,当写入一个帧时,首先会写入该缓冲区,然后等缓冲区数据足够多时,再集中将其中的数据写入到 socket 中,这样就将多次系统调用优化减少到一次。 为了降低系统调用的次数,我们需要使用一个写入缓冲区,当写入一个帧时,首先会写入该缓冲区,然后等缓冲区数据足够多时,再集中将其中的数据写入到 socket 中,这样就将多次系统调用优化减少到一次。
还有,缓冲区也不总是能提升性能。 例如,考虑一个 `bulk` 帧(多个帧放在一起组成一个bulk通过批量发送提升效率),该帧的特点就是:由于由多个帧组合而成,因此帧体数据可能会很大。因此我们不能将其帧体数据写入到缓冲区中,因此数据较大时,先写入缓冲区再写入 socket 会有较大的性能开销(实际上缓冲区就是为了批量写入,既然 bulk 已经是批量了,因此不使用缓冲区也很正常)。 还有,缓冲区也不总是能提升性能。 例如,考虑一个 `bulk` 帧(多个帧放在一起组成一个bulk通过批量发送提升效率),该帧的特点就是:由于由多个帧组合而成,因此帧体数据可能会很大。所以我们不能将其帧体数据写入到缓冲区中,因为数据较大时,先写入缓冲区再写入 socket 会有较大的性能开销(实际上缓冲区就是为了批量写入,既然 bulk 已经是批量了,因此不使用缓冲区也很正常)。
为了实现缓冲写,我们将使用 [`BufWriter`](https://docs.rs/tokio/1/tokio/io/struct.BufWriter.html) 结构体。该结构体实现了 `AsyncWrite` 特征,当 `write` 方法被调用时,不会直接写入到 socket 中,而是先写入到缓冲区中。当缓冲区被填满时,其中的内容会自动刷到(写入到)内部的 socket 中,然后再将缓冲区清空。当然,其中还存在某些优化,通过这些优化可以绕过缓冲区直接访问 socket。 为了实现缓冲写,我们将使用 [`BufWriter`](https://docs.rs/tokio/1/tokio/io/struct.BufWriter.html) 结构体。该结构体实现了 `AsyncWrite` 特征,当 `write` 方法被调用时,不会直接写入到 socket 中,而是先写入到缓冲区中。当缓冲区被填满时,其中的内容会自动刷到(写入到)内部的 socket 中,然后再将缓冲区清空。当然,其中还存在某些优化,通过这些优化可以绕过缓冲区直接访问 socket。

@ -95,7 +95,7 @@ Perfect, 代码成功运行,是时候来解释下其中蕴藏的至高奥秘
let mut client = client::connect("127.0.0.1:6379").await?; let mut client = client::connect("127.0.0.1:6379").await?;
``` ```
[`client::connect`](https://docs.rs/mini-redis/0.4.1/mini_redis/client/fn.connect.html) 函数由`mini-redis` 包提供,它使用异步的方式跟指定的远程 `IP` 地址建立 TCP 长连接,一旦连接建立成功,那 `clien` 的赋值初始化也将完成。 [`client::connect`](https://docs.rs/mini-redis/0.4.1/mini_redis/client/fn.connect.html) 函数由`mini-redis` 包提供,它使用异步的方式跟指定的远程 `IP` 地址建立 TCP 长连接,一旦连接建立成功,那 `client` 的赋值初始化也将完成。
特别值得注意的是:虽然该连接是异步建立的,但是从代码本身来看,完全是**同步的代码编写方式**,唯一能说明异步的点就是 `.await` 特别值得注意的是:虽然该连接是异步建立的,但是从代码本身来看,完全是**同步的代码编写方式**,唯一能说明异步的点就是 `.await`
@ -133,7 +133,7 @@ async fn say_to_world() -> String {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// 此处的函数调用是惰性的,并不会执行 `say_world()` 函数体中的代码 // 此处的函数调用是惰性的,并不会执行 `say_to_world()` 函数体中的代码
let op = say_to_world(); let op = say_to_world();
// 首先打印出 "hello" // 首先打印出 "hello"
@ -184,7 +184,7 @@ fn main() {
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
``` ```
里面有个 `features = ["full"]` 可能大家会比较迷惑,当然,关于它的具体解释在本书的 `[Cargo详解专题]` 有介绍,这里就简单进行说明, 里面有个 `features = ["full"]` 可能大家会比较迷惑,当然,关于它的具体解释在本书的 [Cargo详解专题](https://course.rs/cargo/intro.html) 有介绍,这里就简单进行说明,
`Tokio` 有很多功能和特性,例如 `TCP``UDP``Unix sockets`,同步工具,多调度类型等等,不是每个应用都需要所有的这些特性。为了优化编译时间和最终生成可执行文件大小、内存占用大小,应用可以对这些特性进行可选引入。 `Tokio` 有很多功能和特性,例如 `TCP``UDP``Unix sockets`,同步工具,多调度类型等等,不是每个应用都需要所有的这些特性。为了优化编译时间和最终生成可执行文件大小、内存占用大小,应用可以对这些特性进行可选引入。

@ -7,10 +7,10 @@ Tokio 中的 I/O 操作和 `std` 在使用方式上几无区别,最大的区
- 还有数据结构也实现了它们:`Vec<u8>`、`&[u8]`,这样就可以直接使用这些数据结构作为读写器( reader / writer) - 还有数据结构也实现了它们:`Vec<u8>`、`&[u8]`,这样就可以直接使用这些数据结构作为读写器( reader / writer)
## AsyncRead 和 AsyncWrite ## AsyncRead 和 AsyncWrite
这两个特征为字节流的异步读写提供了便利的使用方法,这些方法都使用 `async` 声明,且需要通过 `.await` 进行调用, 这两个特征为字节流的异步读写提供了便利,通常我们会使用 `AsyncReadExt``AsyncWriteExt` 提供的工具方法,这些方法都使用 `async` 声明,且需要通过 `.await` 进行调用,
#### async fn read #### async fn read
`read` 是一个异步方法可以将数据读入缓冲区( `buffer` )中,然后返回读取的字节数。 `AsyncReadExt::read` 是一个异步方法可以将数据读入缓冲区( `buffer` )中,然后返回读取的字节数。
```rust ```rust
use tokio::fs::File; use tokio::fs::File;
use tokio::io::{self, AsyncReadExt}; use tokio::io::{self, AsyncReadExt};
@ -31,7 +31,7 @@ async fn main() -> io::Result<()> {
需要注意的是:当 `read` 返回 `Ok(0)` 时,意味着字节流( stream )已经关闭,在这之后继续调用 `read` 会立刻完成,依然获取到返回值 `Ok(0)`。 例如,字节流如果是 `TcpStream` 类型,那 `Ok(0)` 说明该**连接的读取端已经被关闭**(写入端关闭,会报其它的错误)。 需要注意的是:当 `read` 返回 `Ok(0)` 时,意味着字节流( stream )已经关闭,在这之后继续调用 `read` 会立刻完成,依然获取到返回值 `Ok(0)`。 例如,字节流如果是 `TcpStream` 类型,那 `Ok(0)` 说明该**连接的读取端已经被关闭**(写入端关闭,会报其它的错误)。
#### async fn read_to_end #### async fn read_to_end
方法会从字节流中读取所有的字节,直到遇到 `EOF` `AsyncReadExt::read_to_end` 方法会从字节流中读取所有的字节,直到遇到 `EOF`
```rust ```rust
use tokio::io::{self, AsyncReadExt}; use tokio::io::{self, AsyncReadExt};
use tokio::fs::File; use tokio::fs::File;
@ -48,7 +48,7 @@ async fn main() -> io::Result<()> {
``` ```
#### async fn write #### async fn write
`write` 异步方法会尝试将缓冲区的内容写入到写入器( `writer` )中,同时返回写入的字节数: `AsyncWriteExt::write` 异步方法会尝试将缓冲区的内容写入到写入器( `writer` )中,同时返回写入的字节数:
```rust ```rust
use tokio::io::{self, AsyncWriteExt}; use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File; use tokio::fs::File;
@ -67,7 +67,7 @@ async fn main() -> io::Result<()> {
上面代码很清晰,但是大家可能会疑惑 `b"some bytes"` 是什么意思。这种写法可以将一个 `&str` 字符串转变成一个字节数组:`&[u8;10]`,然后 `write` 方法又会将这个 `&[u8;10]` 的数组类型隐式强转为数组切片: `&[u8]` 上面代码很清晰,但是大家可能会疑惑 `b"some bytes"` 是什么意思。这种写法可以将一个 `&str` 字符串转变成一个字节数组:`&[u8;10]`,然后 `write` 方法又会将这个 `&[u8;10]` 的数组类型隐式强转为数组切片: `&[u8]`
#### async fn write_all #### async fn write_all
将缓冲区的内容全部写入到写入器中: `AsyncWriteExt::write_all` 将缓冲区的内容全部写入到写入器中:
```rust ```rust
use tokio::io::{self, AsyncWriteExt}; use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File; use tokio::fs::File;
@ -81,12 +81,12 @@ async fn main() -> io::Result<()> {
} }
``` ```
以上只是部分方法,实际上还有一些实用的方法由于篇幅有限无法列出,大家可以通过 API 文档查看完整的列表。 以上只是部分方法,实际上还有一些实用的方法由于篇幅有限无法列出,大家可以通过 [API 文档](https://docs.rs/tokio/latest/tokio/io/index.html) 查看完整的列表。
## 实用函数 ## 实用函数
另外,和标准库一样, `tokio::io` 模块包含了多个实用的函数或API可以用于处理标准输入/输出/错误等。 另外,和标准库一样, `tokio::io` 模块包含了多个实用的函数或API可以用于处理标准输入/输出/错误等。
例如,`tokio::io::copy` 异步的将读取器( `reader` )中的内容拷贝到写入器中。 例如,`tokio::io::copy` 异步的将读取器( `reader` )中的内容拷贝到写入器( `writer` )中。
```rust ```rust
use tokio::fs::File; use tokio::fs::File;
use tokio::io; use tokio::io;
@ -288,6 +288,10 @@ struct Task {
但是再怎么优化,任务的结构体至少也会跟其中的栈数组一样大,因此通常情况下,使用堆上的缓冲区会高效实用的多。 但是再怎么优化,任务的结构体至少也会跟其中的栈数组一样大,因此通常情况下,使用堆上的缓冲区会高效实用的多。
> 当任务因为调度在线程间移动时,存储在栈上的数据需要进行保存和恢复,过大的栈上变量会带来不小的数据拷贝开销
>
> 因此,存储大量数据的变量最好放到堆上
##### 处理EOF ##### 处理EOF
当 TCP 连接的读取端关闭后,再调用 `read` 方法会返回 `Ok(0)`。此时,再继续下去已经没有意义,因此我们需要退出循环。忘记在 EOF 时退出读取循环,是网络编程中一个常见的 bug : 当 TCP 连接的读取端关闭后,再调用 `read` 方法会返回 `Ok(0)`。此时,再继续下去已经没有意义,因此我们需要退出循环。忘记在 EOF 时退出读取循环,是网络编程中一个常见的 bug :
```rust ```rust

@ -30,7 +30,7 @@ Rust 语言本身只提供了异步编程所需的基本特性,例如 `async/.
如果你只用 `tokio` 那兼容性自然不是问题至于难以上手Rust 这么难,我们都学到现在了,何况区区一个异步运行时,在本书的帮忙下,这些都不再是个问题:) 如果你只用 `tokio` 那兼容性自然不是问题至于难以上手Rust 这么难,我们都学到现在了,何况区区一个异步运行时,在本书的帮忙下,这些都不再是个问题:)
## tokio简介 ## tokio简介
Tokio是一个纸醉金迷之地只要有钱就可以为所欲为抱歉走错片场了。`Tokio` 是 Rust 最优秀的异步运行时框架,它提供了写异步网络服务所需的几乎所有功能,不仅仅适用于大型服务器,还适用于小型嵌入式设备,它主要由以下组件构成: tokio是一个纸醉金迷之地只要有钱就可以为所欲为抱歉走错片场了。`tokio` 是 Rust 最优秀的异步运行时框架,它提供了写异步网络服务所需的几乎所有功能,不仅仅适用于大型服务器,还适用于小型嵌入式设备,它主要由以下组件构成:
- 多线程版本的异步运行时,可以运行使用 `async/.await` 编写的代码 - 多线程版本的异步运行时,可以运行使用 `async/.await` 编写的代码
- 标准库中阻塞API的异步版本例如`thread::sleep`会阻塞当前线程,`tokio`中就提供了相应的异步实现版本 - 标准库中阻塞API的异步版本例如`thread::sleep`会阻塞当前线程,`tokio`中就提供了相应的异步实现版本
@ -41,7 +41,7 @@ Tokio是一个纸醉金迷之地只要有钱就可以为所欲为
**高性能** **高性能**
因为快所以快,前者是 Rust 快,后者是 `tokio` 快。 `tokio` 在编写时充分利用了 Rust 提供的各种零抽象和高性能特性,而且贯彻了 Rust 的牛逼思想:如果你选择手写代码,那么最好的结果就是跟 `tokio` 一样快! 因为快所以快,前者是 Rust 快,后者是 `tokio` 快。 `tokio` 在编写时充分利用了 Rust 提供的各种零成本抽象和高性能特性,而且贯彻了 Rust 的牛逼思想:如果你选择手写代码,那么最好的结果就是跟 `tokio` 一样快!
以下是一张官方提供的性能参考图,大致能体现出 `tokio` 的性能之恐怖: 以下是一张官方提供的性能参考图,大致能体现出 `tokio` 的性能之恐怖:
<img alt="tokio performance" src="/img/tokio-01.png" class="center" /> <img alt="tokio performance" src="/img/tokio-01.png" class="center" />
@ -59,6 +59,7 @@ Rust 语言的安全可靠性顺理成章的影响了 `tokio` 的可靠性,曾
同时 `tokio` 遵循了标准库的命名规则,让熟悉标准库的用户可以很快习惯于 `tokio` 的语法,再借助于 Rust 强大的类型系统,用户可以轻松地编写和交付正确的代码。 同时 `tokio` 遵循了标准库的命名规则,让熟悉标准库的用户可以很快习惯于 `tokio` 的语法,再借助于 Rust 强大的类型系统,用户可以轻松地编写和交付正确的代码。
**使用灵活性** **使用灵活性**
`tokio` 支持你灵活的定制自己想要的运行时,例如你可以选择多线程 + 任务盗取模式的复杂运行时,也可以选择单线程的轻量级运行时。总之,几乎你的每一种需求在 `tokio` 中都能寻找到支持(画外音:强大的灵活性需要一定的复杂性来换取,并不是免费的午餐)。 `tokio` 支持你灵活的定制自己想要的运行时,例如你可以选择多线程 + 任务盗取模式的复杂运行时,也可以选择单线程的轻量级运行时。总之,几乎你的每一种需求在 `tokio` 中都能寻找到支持(画外音:强大的灵活性需要一定的复杂性来换取,并不是免费的午餐)。
### 劣势 ### 劣势

@ -54,7 +54,7 @@ async fn main() {
let (tx2, rx2) = oneshot::channel(); let (tx2, rx2) = oneshot::channel();
tokio::spawn(async { tokio::spawn(async {
// 等待 `some_peration` 的完成 // 等待 `some_operation` 的完成
// 或者处理 `oneshot` 的关闭通知 // 或者处理 `oneshot` 的关闭通知
tokio::select! { tokio::select! {
val = some_operation() => { val = some_operation() => {
@ -62,12 +62,10 @@ async fn main() {
} }
_ = tx1.closed() => { _ = tx1.closed() => {
// 收到了发送端发来的关闭信号 // 收到了发送端发来的关闭信号
// // `select` 即将结束,此时,正在进行的 `some_operation()` 任务会被取消,任务自动完成,
// `select` 即将结束,此时,正在进行的 `some_operation()` 任务会被取消,任务自动完成, // tx1 被释放 // tx1 被释放
} }
} }
}); });
tokio::spawn(async { tokio::spawn(async {
@ -85,10 +83,10 @@ async fn main() {
} }
``` ```
上面代码的重点就在于 `tx1.closed` 所在的分支,一旦发送端被关闭,那该分支就会被执行,然后 `select` 会退出,并清理掉还没执行的第一个分支 `val = some_operation()` ,这其中 `some_peration` 返回的 `Future` 也会被清理,根据之前的内容,`Future` 被清理那相应的任务会立即取消,因此 `some_oeration` 会被取消,不再执行。 上面代码的重点就在于 `tx1.closed` 所在的分支,一旦发送端被关闭,那该分支就会被执行,然后 `select` 会退出,并清理掉还没执行的第一个分支 `val = some_operation()` ,这其中 `some_operation` 返回的 `Future` 也会被清理,根据之前的内容,`Future` 被清理那相应的任务会立即取消,因此 `some_operation` 会被取消,不再执行。
#### Future 的实现 #### Future 的实现
为了更好的理解 `select` 的工作原理,我们来看看如果使用 `Future`。当然,这里是一个简化版本,在实际中,`select!` 会包含一些额外的功能,例如一开始会随机选择一个分支进行 `poll` 为了更好的理解 `select` 的工作原理,我们来看看如果使用 `Future` 该如何实现。当然,这里是一个简化版本,在实际中,`select!` 会包含一些额外的功能,例如一开始会随机选择一个分支进行 `poll`
```rust ```rust
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -150,7 +148,7 @@ async fn main() {
`select` 宏开始执行后,所有的分支会开始并发的执行。当任何一个**表达式**完成时,会将结果跟**模式**进行匹配。若匹配成功,则剩下的表达式会被释放。 `select` 宏开始执行后,所有的分支会开始并发的执行。当任何一个**表达式**完成时,会将结果跟**模式**进行匹配。若匹配成功,则剩下的表达式会被释放。
最常用的**模式**就是用变量名去匹配表达式返回的值,然该变量就可以在**结果处理**环节使用。 最常用的**模式**就是用变量名去匹配表达式返回的值,然该变量就可以在**结果处理**环节使用。
如果当前的模式不能匹配,剩余的 `async` 表达式将继续并发的执行,直到下一个完成。 如果当前的模式不能匹配,剩余的 `async` 表达式将继续并发的执行,直到下一个完成。
@ -245,7 +243,7 @@ async fn main() {
## 错误传播 ## 错误传播
在 Rust 中使用 `?` 可以对错误进行传播,但是在 `select!` 中,`?` 如何工作取决于它是在分支中的 `async` 表达式使用还是在结果处理的代码中使用: 在 Rust 中使用 `?` 可以对错误进行传播,但是在 `select!` 中,`?` 如何工作取决于它是在分支中的 `async` 表达式使用还是在结果处理的代码中使用:
- 在分支中 `ascyn` 表达式使用会将该表达式的结果变成一个 `Result` - 在分支中 `async` 表达式使用会将该表达式的结果变成一个 `Result`
- 在结果处理中使用,会将错误直接传播到 `select!` 之外 - 在结果处理中使用,会将错误直接传播到 `select!` 之外
```rust ```rust
@ -321,7 +319,7 @@ async fn main() {
上面代码中,`rx` 通道关闭后,`recv()` 方法会返回一个 `None`,可以看到没有任何模式能够匹配这个 `None`,那为何不会报错?秘密就在于 `else` 上:当使用模式去匹配分支时,若之前的所有分支都无法被匹配,那 `else` 分支将被执行。 上面代码中,`rx` 通道关闭后,`recv()` 方法会返回一个 `None`,可以看到没有任何模式能够匹配这个 `None`,那为何不会报错?秘密就在于 `else` 上:当使用模式去匹配分支时,若之前的所有分支都无法被匹配,那 `else` 分支将被执行。
## 借用 ## 借用
当在 Tokio 中生成( spawan )任务时,其 async 语句块必须拥有其中数据的所有权。而 `select!` 并没有这个限制,它的每个分支表达式可以直接借用数据,然后进行并发操作。只要遵循 Rust 的借用规则,多个分支表达式可以不可变的借用同一个数据,或者在一个表达式可变的借用某个数据。 当在 Tokio 中生成( spawn )任务时,其 async 语句块必须拥有其中数据的所有权。而 `select!` 并没有这个限制,它的每个分支表达式可以直接借用数据,然后进行并发操作。只要遵循 Rust 的借用规则,多个分支表达式可以不可变的借用同一个数据,或者在一个表达式可变的借用某个数据。
来看个例子,在这里我们同时向两个 TCP 目标发送同样的数据: 来看个例子,在这里我们同时向两个 TCP 目标发送同样的数据:
```rust ```rust
@ -357,7 +355,7 @@ async fn race(
如果你把连接过程放在了结果处理中,那连接失败会直接从 `race` 函数中返回,而不是继续执行另一个分支中的连接! 如果你把连接过程放在了结果处理中,那连接失败会直接从 `race` 函数中返回,而不是继续执行另一个分支中的连接!
还有一个非常重要的点,**借用规则在分支表达式和结果处理中存在很大的不同**。例如上面代码中,我们在两个分支表达式中分别对 `data` 做了不可变借用这当然ok但是若是两次可变借用那编译器会立即进行报错。但是转折来了当在结果处理中进行两次可变借用时却不会报错大家可以思考下为什么提示下思考下分支在执行成后会发生什么。 还有一个非常重要的点,**借用规则在分支表达式和结果处理中存在很大的不同**。例如上面代码中,我们在两个分支表达式中分别对 `data` 做了不可变借用这当然ok但是若是两次可变借用那编译器会立即进行报错。但是转折来了当在结果处理中进行两次可变借用时却不会报错大家可以思考下为什么提示下思考下分支在执行完成后会发生什么?
```rust ```rust
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -540,7 +538,7 @@ async fn main() {
当第一次循环开始时, 第一个分支会立即完成,因为 `operation` 的参数是 `None`。当第一个分支执行完成时,`done` 会变成 `true`,此时第一个分支的条件将无法被满足,开始执行第二个分支。 当第一次循环开始时, 第一个分支会立即完成,因为 `operation` 的参数是 `None`。当第一个分支执行完成时,`done` 会变成 `true`,此时第一个分支的条件将无法被满足,开始执行第二个分支。
当第二个分支收到一个偶数时,`done` 会被修改为 `false`,且 `operation` 被设置了值。 此后再一次循环时,第一个分支会被执行,且 `oertation` 返回一个 `Some(2)`,因此会触发 `return` ,最终结束循环并返回。 当第二个分支收到一个偶数时,`done` 会被修改为 `false`,且 `operation` 被设置了值。 此后再一次循环时,第一个分支会被执行,且 `operation` 返回一个 `Some(2)`,因此会触发 `return` ,最终结束循环并返回。
这段代码引入了一个新的语法: `if !done`,在解释之前,先看看去掉后会如何: 这段代码引入了一个新的语法: `if !done`,在解释之前,先看看去掉后会如何:
```console ```console
@ -556,10 +554,10 @@ note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
那大家肯定有疑问了,既然 `operation` 不能再被调用了,我们该如何在有偶数值时,再回到第一个分支对其进行调用呢?答案就是 `operation.set(action(Some(v)));`,该操作会重新使用新的参数设置 `operation` 那大家肯定有疑问了,既然 `operation` 不能再被调用了,我们该如何在有偶数值时,再回到第一个分支对其进行调用呢?答案就是 `operation.set(action(Some(v)));`,该操作会重新使用新的参数设置 `operation`
## spawan 和 select! 的一些不同 ## spawn 和 select! 的一些不同
学到现在,相信大家对于 `tokio::spawn``select!` 已经非常熟悉,它们的共同点就是都可以并发的运行异步操作。 学到现在,相信大家对于 `tokio::spawn``select!` 已经非常熟悉,它们的共同点就是都可以并发的运行异步操作。
然而它们使用的策略大相径庭。 然而它们使用的策略大相径庭。
`tokio::spawan` 函数会启动新的任务来运行一个异步操作,每个任务都是一个独立的对象可以单独被 Tokio 调度运行,因此两个不同的任务的调度都是独立进行的,甚至于它们可能会运行在两个不同的操作系统线程上。鉴于此,生成的任务和生成的线程有一个相同的限制:不允许对外部环境中的值进行借用。 `tokio::spawn` 函数会启动新的任务来运行一个异步操作,每个任务都是一个独立的对象可以单独被 Tokio 调度运行,因此两个不同的任务的调度都是独立进行的,甚至于它们可能会运行在两个不同的操作系统线程上。鉴于此,生成的任务和生成的线程有一个相同的限制:不允许对外部环境中的值进行借用。
`select!` 宏就不一样了,它在同一个任务中并发运行所有的分支,正是因为这样在同一个任务中,这些分支无法被同时运行。 `select!` 宏在单个任务中实现了多路复用的功能。 `select!` 宏就不一样了,它在同一个任务中并发运行所有的分支。正是因为这样,在同一个任务中,这些分支无法被同时运行。 `select!` 宏在单个任务中实现了多路复用的功能。

@ -1,5 +1,5 @@
# 共享状态 # 共享状态
上一章节中,咱们搭建了一个异步的 redis 服务器,并成功的提供了服务,但是其隐藏了一个巨大的问题:状态(数据)无法在多个连接之间共享,下面一起来看看该如何解决。 上一章节中,咱们搭建了一个异步的 redis 服务器,并成功的提供了服务,但是其隐藏了一个巨大的问题:状态(数据)无法在多个连接之间共享,下面一起来看看该如何解决。
## 解决方法 ## 解决方法
好在 Tokio 十分强大,上面问题对应的解决方法也不止一种: 好在 Tokio 十分强大,上面问题对应的解决方法也不止一种:
@ -24,7 +24,7 @@ bytes = "1"
## 初始化 HashMap ## 初始化 HashMap
由于 `HashMap` 会在多个任务甚至多个线程间共享,再结合之前的选择,最终我们决定使用 `<Arc<Mutext<T>>>` 的方式对其进行包裹。 由于 `HashMap` 会在多个任务甚至多个线程间共享,再结合之前的选择,最终我们决定使用 `<Arc<Mutext<T>>>` 的方式对其进行包裹。
但是,大家先来畅想一下使用它进行包后的类型长什么样? 大概,可能,长这样:`Arc<Mutex<HashMap<String, Bytes>>>`,天哪噜,一不小心,你就遇到了 Rust 的阴暗面:类型大串烧。可以想象,如果要在代码中到处使用这样的类型,可读性会极速下降,因此我们需要一个[类型别名](https://course.rs/advance/custom-type.html#类型别名type-alias)( type alias )来简化下: 但是,大家先来畅想一下使用它进行包后的类型长什么样? 大概,可能,长这样:`Arc<Mutex<HashMap<String, Bytes>>>`,天哪噜,一不小心,你就遇到了 Rust 的阴暗面:类型大串烧。可以想象,如果要在代码中到处使用这样的类型,可读性会极速下降,因此我们需要一个[类型别名](https://course.rs/advance/custom-type.html#类型别名type-alias)( type alias )来简化下:
```rust ```rust
use bytes::Bytes; use bytes::Bytes;
use std::collections::HashMap; use std::collections::HashMap;
@ -35,7 +35,7 @@ type Db = Arc<Mutex<HashMap<String, Bytes>>>;
此时,`Db` 就是一个类型别名,使用它就可以替代那一大串的东东,等下你就能看到功效。 此时,`Db` 就是一个类型别名,使用它就可以替代那一大串的东东,等下你就能看到功效。
接着,我们需要在 `main` 函数中对 `HashMap` 进行初始化,然后使用 `Arc` 克隆一份它的所有权并将其传入到生成的异步任务中。事实上在 Tokio 中,这里的 `Arc` 被称**handle**,或者更宽泛的说,`handle` 在 Tokio 中可以用来访问某个共享状态。 接着,我们需要在 `main` 函数中对 `HashMap` 进行初始化,然后使用 `Arc` 克隆一份它的所有权并将其传入到生成的异步任务中。事实上在 Tokio 中,这里的 `Arc` 被称为 **handle**,或者更宽泛的说,`handle` 在 Tokio 中可以用来访问某个共享状态。
```rust ```rust
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -66,11 +66,11 @@ async fn main() {
#### 为何使用 `std::sync::Mutex` #### 为何使用 `std::sync::Mutex`
上面代码还有一点非常重要,那就是我们使用了 `std::sync::Mutex` 来保护 `HashMap`,而不是使用 `tokio::sync::Mutex` 上面代码还有一点非常重要,那就是我们使用了 `std::sync::Mutex` 来保护 `HashMap`,而不是使用 `tokio::sync::Mutex`
在使用 Tokio 编写异步代码时,一个常见的错误无条件地使用 `tokio::sync::Mutex` 而真相是Tokio 提供的异步锁只应该使用在 `.await`调用过程中,而且 `Tokio``Mutex` 实际上内部使用的也是 `std::sync::Mutex` 在使用 Tokio 编写异步代码时,一个常见的错误无条件地使用 `tokio::sync::Mutex` 而真相是Tokio 提供的异步锁只应该在跨多个 `.await`调用时使用,而且 Tokio`Mutex` 实际上内部使用的也是 `std::sync::Mutex`
多补充几句,在异步代码中,关于锁的使用有以下经验之谈: 多补充几句,在异步代码中,关于锁的使用有以下经验之谈:
- 锁如果在 `.await` 过程中持有,应该使用 `Tokio` 提供的锁,原因是 `.await`的过程中锁可能在线程间转移,若使用标准库的同步锁存在死锁的可能性,例如某个任务刚获取完锁,还没使用完就因为 `.await` 让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁 - 锁如果在多个 `.await` 过程中持有,应该使用 Tokio 提供的锁,原因是 `.await`的过程中锁可能在线程间转移,若使用标准库的同步锁存在死锁的可能性,例如某个任务刚获取完锁,还没使用完就因为 `.await` 让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁
- 锁竞争不多的情况下,使用 `std::sync::Mutex` - 锁竞争不多的情况下,使用 `std::sync::Mutex`
- 锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如 [`parking_lot::Mutex`](https://docs.rs/parking_lot/0.10.2/parking_lot/type.Mutex.html) - 锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如 [`parking_lot::Mutex`](https://docs.rs/parking_lot/0.10.2/parking_lot/type.Mutex.html)
@ -112,11 +112,11 @@ async fn process(socket: TcpStream, db: Db) {
## 任务、线程和锁竞争 ## 任务、线程和锁竞争
当竞争不多的时候,使用阻塞性的锁去保护共享数据是一个正确的选择。当一个锁竞争触发后,当前正在执行任务(请求锁)的线程会被阻塞,并等待锁被前一个使用者释放。这里的关键就是:**锁竞争不仅仅会导致当前的任务被阻塞,还会导致执行任务的线程被阻塞,因此该线程准备执行的其它任务也会因此被阻塞!** 当竞争不多的时候,使用阻塞性的锁去保护共享数据是一个正确的选择。当一个锁竞争触发后,当前正在执行任务(请求锁)的线程会被阻塞,并等待锁被前一个使用者释放。这里的关键就是:**锁竞争不仅仅会导致当前的任务被阻塞,还会导致执行任务的线程被阻塞,因此该线程准备执行的其它任务也会因此被阻塞!**
默认情况下,`Tokio` 调度器使用了多线程模式,此时如果有大量的任务都需要访问同一个锁,那么锁竞争将变得激烈起来。当然,你也可以使用 [**current_thread**](https://docs.rs/tokio/1.15.0/tokio/runtime/index.html#current-thread-scheduler) 运行时设置,在该设置下会使用一个单线程的调度器(执行器),所有的任务都会创建并执行在当前线程上,因此不再会有锁竞争。 默认情况下Tokio 调度器使用了多线程模式,此时如果有大量的任务都需要访问同一个锁,那么锁竞争将变得激烈起来。当然,你也可以使用 [**current_thread**](https://docs.rs/tokio/1.15.0/tokio/runtime/index.html#current-thread-scheduler) 运行时设置,在该设置下会使用一个单线程的调度器(执行器),所有的任务都会创建并执行在当前线程上,因此不再会有锁竞争。
> current_thread 是一个轻量级、单线程的运行时当任务数不多或连接数不多时是一个很好的选择。例如你想在一个异步客户端库的基础上提供给用户同步的API访问时该模式就很适用 > current_thread 是一个轻量级、单线程的运行时当任务数不多或连接数不多时是一个很好的选择。例如你想在一个异步客户端库的基础上提供给用户同步的API访问时该模式就很适用
当同步锁的竞争变成一个问题时,使用 `Tokio` 提供的异步锁几乎并不能帮你解决问题,此时可以考虑如下选项: 当同步锁的竞争变成一个问题时,使用 Tokio 提供的异步锁几乎并不能帮你解决问题,此时可以考虑如下选项:
- 创建专门的任务并使用消息传递的方式来管理状态 - 创建专门的任务并使用消息传递的方式来管理状态
- 将锁进行分片 - 将锁进行分片
@ -158,7 +158,7 @@ async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
} // 锁在这里超出作用域 } // 锁在这里超出作用域
``` ```
如果你要 `spawn` 一个任务来执行上面的函数话,会报错: 如果你要 `spawn` 一个任务来执行上面的函数话,会报错:
```console ```console
error: future cannot be sent between threads safely error: future cannot be sent between threads safely
--> src/lib.rs:13:5 --> src/lib.rs:13:5

@ -109,7 +109,7 @@ async fn main() {
任务在 Tokio 中远比看上去要更轻量例如创建一个任务仅仅需要一次64字节大小的内存分配。因此应用程序在生成任务上完全不应该有任何心理负担除非你在一台没那么好的机器上疯狂生成了几百万个任务。。。 任务在 Tokio 中远比看上去要更轻量例如创建一个任务仅仅需要一次64字节大小的内存分配。因此应用程序在生成任务上完全不应该有任何心理负担除非你在一台没那么好的机器上疯狂生成了几百万个任务。。。
#### `'static` 约束 #### `'static` 约束
当使用 Tokio 创建一个任务时,该任务类型的生命周期必须 `'static`。意味着,在任务中不能使用外部数据的引用: 当使用 Tokio 创建一个任务时,该任务类型的生命周期必须 `'static`。意味着,在任务中不能使用外部数据的引用:
```rust ```rust
use tokio::task; use tokio::task;
@ -176,7 +176,7 @@ PS: 这一段内容Tokio 的原文有些逻辑混乱,我已经尽可能的
> 一个关于 `'static` 生命周期的常见误解就是它将永远存活(跟整个程序活得一样久),实际上并不是这样的。同样的,一个变量是 `'static` 并不意味着它存在内存泄漏的可能性。 > 一个关于 `'static` 生命周期的常见误解就是它将永远存活(跟整个程序活得一样久),实际上并不是这样的。同样的,一个变量是 `'static` 并不意味着它存在内存泄漏的可能性。
#### Send 约束 #### Send 约束
`tokio::spawn` 生成的任务必须实现 `Send` 特征,因为 Tokio 调度器会将任务在线程间进行移动,当这些任务在 `.await` 执行过程中发生阻塞时。 `tokio::spawn` 生成的任务必须实现 `Send` 特征,因为当这些任务在 `.await` 执行过程中发生阻塞时Tokio 调度器会将任务在线程间移动
**一个任务要实现 `Send` 特征,那它在 `.await` 调用的过程中所持有的全部数据都必须实现 `Send` 特征**。当 `.await` 调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。该流程能正确的工作,任务必须将`.await`之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。若这些状态实现了 `Send` 特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动。 **一个任务要实现 `Send` 特征,那它在 `.await` 调用的过程中所持有的全部数据都必须实现 `Send` 特征**。当 `.await` 调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。该流程能正确的工作,任务必须将`.await`之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。若这些状态实现了 `Send` 特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动。

@ -111,7 +111,7 @@ got = Ok(Message { channel: "numbers", content: b"6" })
与迭代器类似,`stream` 也有适配器,例如一个 `stream` 适配器可以将一个 `stream` 转变成另一个 `stream` ,例如 `map`、`take` 和 `filter` 与迭代器类似,`stream` 也有适配器,例如一个 `stream` 适配器可以将一个 `stream` 转变成另一个 `stream` ,例如 `map`、`take` 和 `filter`
在之前的客户端中,`subscribe` 订阅一直持续下去,知道程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。 在之前的客户端中,`subscribe` 订阅一直持续下去,直到程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。
```rust ```rust
let messages = subscriber let messages = subscriber
.into_stream() .into_stream()
@ -228,7 +228,7 @@ impl Stream for Interval {
``` ```
#### async-stream #### async-stream
手动实现 `Stream` 特征实际上是相当麻烦的事,然而不幸地是Rust 语言的 `async/await` 语法目前还不能用于定义 `stream`,虽然相关的工作已经在进行中。 手动实现 `Stream` 特征实际上是相当麻烦的事不幸地是Rust 语言的 `async/await` 语法目前还不能用于定义 `stream`,虽然相关的工作已经在进行中。
作为替代方案,[`async-stream`](https://docs.rs/async-stream/latest/async_stream/) 包提供了一个 `stream!` 宏,它可以将一个输入转换成 `stream`,使用这个包,上面的代码可以这样实现: 作为替代方案,[`async-stream`](https://docs.rs/async-stream/latest/async_stream/) 包提供了一个 `stream!` 宏,它可以将一个输入转换成 `stream`,使用这个包,上面的代码可以这样实现:
```rust ```rust

Loading…
Cancel
Save