diff --git a/book/contents/async/async-await.md b/book/contents/async/async-await.md index 545c3e73..03e6bcbf 100644 --- a/book/contents/async/async-await.md +++ b/book/contents/async/async-await.md @@ -1,12 +1,12 @@ # async/await 和 Stream流处理 -在入门章节中,我们简单学习了该如何使用 `async/.await`, 同时在后面也了解了一些底层原理,现在是时候继续继续深入了。 +在入门章节中,我们简单学习了该如何使用 `async/.await`, 同时在后面也了解了一些底层原理,现在是时候继续深入了。 `async/.await`是 Rust 语法的一部分,它在遇到阻塞操作时( 例如IO )会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。 有两种方式可以使用`async`: `async fn`用于声明函数,`async { ... }`用于声明语句块,它们会返回一个实现 `Future` 特征的值: ```rust // `foo()`返回一个`Future`, -// 当调用`foo().wait`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值 +// 当调用`foo().await`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值 async fn foo() -> u8 { 5 } fn bar() -> impl Future { @@ -124,7 +124,7 @@ fn move_block() -> impl Future { ## Stream流处理 -`Stream` 特征类似于 `Future` 特征,但是前者可以在完成前可以生成多个值,这种行为跟标准库中的 `Iterator` 特征倒是颇为相似。 +`Stream` 特征类似于 `Future` 特征,但是前者在完成前可以生成多个值,这种行为跟标准库中的 `Iterator` 特征倒是颇为相似。 ```rust trait Stream { // Stream生成的值的类型 diff --git a/book/contents/async/multi-futures-simultaneous.md b/book/contents/async/multi-futures-simultaneous.md index f488fd42..f04c1af8 100644 --- a/book/contents/async/multi-futures-simultaneous.md +++ b/book/contents/async/multi-futures-simultaneous.md @@ -25,9 +25,9 @@ async fn enjoy_book_and_music() -> (Book, Music) { } ``` -看上去像模像样,嗯,在某些语言中也许可以,但是 Rust 不行。因为在某些语言中,`Future`一旦创建就开始运行,等到返回的时候,基本就可以同时结束并返回了。 但是 Rust 中的 `Future` 是惰性的,直到调用 `.await` 时,才会开始运行。而那两个 `await` 由于在代码中有先后顺序,因此它们是连续运行的。 +看上去像模像样,嗯,在某些语言中也许可以,但是 Rust 不行。因为在某些语言中,`Future`一旦创建就开始运行,等到返回的时候,基本就可以同时结束并返回了。 但是 Rust 中的 `Future` 是惰性的,直到调用 `.await` 时,才会开始运行。而那两个 `await` 由于在代码中有先后顺序,因此它们是顺序运行的。 -为了正确的并发两个 `Future` , 我们来试试 `futures::join!` 宏: +为了正确的并发运行两个 `Future` , 我们来试试 `futures::join!` 宏: ```rust use futures::join; @@ -107,7 +107,7 @@ async fn race_tasks() { #### default 和 complete `select!`还支持 `default` 和 `complete` 分支: - `complete` 分支当所有的 `Future` 和 `Stream` 完成后才会被执行,它往往配合`loop`使用,`loop`用于循环完成所有的 `Future` -- `default`分支,若没有任何 `Future` 或 `Stream` 处于 `Read` 状态, 则该分支会被立即执行 +- `default`分支,若没有任何 `Future` 或 `Stream` 处于 `Ready` 状态, 则该分支会被立即执行 ```rust use futures::future; @@ -147,7 +147,7 @@ pin_mut!(t1, t2); 首先,`.fuse()`方法可以让 `Future` 实现 `FusedFuture` 特征, 而 `pin_mut!` 宏会为 `Future` 实现 `Unpin`特征,这两个特征恰恰是使用 `select` 所必须的: - `Unpin`,由于 `select` 不会通过拿走所有权的方式使用`Future`,而是通过可变引用的方式去使用,这样当 `select` 结束后,该 `Future` 若没有被完成,它的所有权还可以继续被其它代码使用。 -- `FusedFuture`的原因跟上面类似,当 `Future` 一旦完成后,那 `select` 就不能再对其进行轮询使用。`Fuse`意味着融化的意思,相当于 `Future` 一旦完成,就融化了,无法再被使用。 +- `FusedFuture`的原因跟上面类似,当 `Future` 一旦完成后,那 `select` 就不能再对其进行轮询使用。`Fuse`意味着熔断,相当于 `Future` 一旦完成,再次调用`poll`会直接返回`Poll::Pending`。 只有实现了`FusedFuture`,`select` 才能配合 `loop` 一起使用。假如没有实现,就算一个 `Future` 已经完成了,它依然会被 `select` 不停的轮询执行。 diff --git a/book/contents/async/pin-unpin.md b/book/contents/async/pin-unpin.md index 0e4b94dc..512be531 100644 --- a/book/contents/async/pin-unpin.md +++ b/book/contents/async/pin-unpin.md @@ -119,10 +119,10 @@ pub struct Pin

{ 因此,一个类型如果不能被移动,它必须实现 `!Unpin` 特征。如果大家对 `Pin` 、 `Unpin` 还是模模糊糊,建议再重复看一遍之前的内容,理解它们对于我们后面要讲到的内容非常重要! -如果将 `Unpin` 与之前章节学过的 [`Send/Sync`](https://www.zhihu.com/question/303273488/answer/2309266713) 进行下对比,会发现它们都很像: +如果将 `Unpin` 与之前章节学过的 [`Send/Sync`](https://course.rs/advance/concurrency-with-threads/send-sync.html) 进行下对比,会发现它们都很像: - 都是标记特征( marker trait ),该特征未定义任何行为,非常适用于标记 -- 都可以通过!语法去除实现 +- 都可以通过`!`语法去除实现 - 绝大多数情况都是自动实现, 无需我们的操心 @@ -310,22 +310,57 @@ error[E0277]: `PhantomPinned` cannot be unpinned > 需要注意的是固定在栈上非常依赖于你写出的 `unsafe` 代码的正确性。我们知道 `&'a mut T` 可以固定的生命周期是 `'a` ,但是我们却不知道当生命周期 `'a` 结束后,该指针指向的数据是否会被移走。如果你的 `unsafe` 代码里这么实现了,那么就会违背 `Pin` 应该具有的作用! > > 一个常见的错误就是忘记去遮蔽(shadow )初始的变量,因为你可以 `drop` 掉 `Pin` ,然后在 `&'a mut T` 结束后去移动数据: - -```rust -pub fn main() { - let mut test1 = Test::new("test1"); - let mut test1 = unsafe { Pin::new_unchecked(&mut test1) }; - Test::init(test1.as_mut()); - - let mut test2 = Test::new("test2"); - let mut test2 = unsafe { Pin::new_unchecked(&mut test2) }; - Test::init(test2.as_mut()); - - println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref())); - std::mem::swap(test1.get_mut(), test2.get_mut()); - println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref())); -} -``` +> ```rust +> fn main() { +> let mut test1 = Test::new("test1"); +> let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) }; +> Test::init(test1_pin.as_mut()); +> +> drop(test1_pin); +> println!(r#"test1.b points to "test1": {:?}..."#, test1.b); +> +> let mut test2 = Test::new("test2"); +> mem::swap(&mut test1, &mut test2); +> println!("... and now it points nowhere: {:?}", test1.b); +> } +> # use std::pin::Pin; +> # use std::marker::PhantomPinned; +> # use std::mem; +> # +> # #[derive(Debug)] +> # struct Test { +> # a: String, +> # b: *const String, +> # _marker: PhantomPinned, +> # } +> # +> # +> # impl Test { +> # fn new(txt: &str) -> Self { +> # Test { +> # a: String::from(txt), +> # b: std::ptr::null(), +> # // This makes our type `!Unpin` +> # _marker: PhantomPinned, +> # } +> # } +> # +> # fn init<'a>(self: Pin<&'a mut Self>) { +> # let self_ptr: *const String = &self.a; +> # let this = unsafe { self.get_unchecked_mut() }; +> # this.b = self_ptr; +> # } +> # +> # fn a<'a>(self: Pin<&'a Self>) -> &'a str { +> # &self.get_ref().a +> # } +> # +> # fn b<'a>(self: Pin<&'a Self>) -> &'a String { +> # assert!(!self.b.is_null(), "Test::b called without Test::init being called first"); +> # unsafe { &*(self.b) } +> # } +> # } +> ``` #### 固定到堆上 将一个 `!Unpin` 类型的值固定到堆上,会给予该值一个稳定的内存地址,它指向的堆中的值在 `Pin` 后是无法被移动的。而且与固定在栈上不同,我们知道堆上的值在整个生命周期内都会被稳稳地固定住。 @@ -412,7 +447,10 @@ execute_unpin_future(fut); // OK - 若 `T: Unpin` ( Rust 类型的默认实现),那么 `Pin<'a, T>` 跟 `&'a mut T` 完全相同,也就是 `Pin` 将没有任何效果, 该移动还是照常移动 - 绝大多数标准库类型都实现了 `Unpin` ,事实上,对于 Rust 中你能遇到的绝大多数类型,该结论依然成立 ,其中一个例外就是:`async/await` 生成的 `Future` 没有实现 `Unpin` -- 你可以通过以下方法为自己的类型添加 `!Unpin` 约束:1. 使用文中提到的 `std::marker::PhantomPinned` 2. 使用`nightly` 版本下的 `feature flag` +- 你可以通过以下方法为自己的类型添加 `!Unpin` 约束: + - 使用文中提到的 `std::marker::PhantomPinned` + - 使用`nightly` 版本下的 `feature flag` - 可以将值固定到栈上,也可以固定到堆上 - 将 `!Unpin` 值固定到栈上需要使用 `unsafe` - - 将 `!Unpin` 值固定到堆上无需 `unsafe` ,可以通过 `Box::pin` 来简单的实现 \ No newline at end of file + - 将 `!Unpin` 值固定到堆上无需 `unsafe` ,可以通过 `Box::pin` 来简单的实现 +- 当固定类型`T: !Unpin`时,你需要保证数据从被固定到被drop这段时期内,其内存不会变得非法或者被重用 diff --git a/book/contents/async/web-server.md b/book/contents/async/web-server.md index dfc24faa..9de1b1c0 100644 --- a/book/contents/async/web-server.md +++ b/book/contents/async/web-server.md @@ -79,10 +79,10 @@ fn handle_connection(mut stream: TcpStream) { 运行以上代码,并从浏览器访问 `127.0.0.1:7878` 你将看到一条来自 `Ferris` 的问候。 -在回忆了单线程版本该如何实现后,我们也将进入正题,一起来实现一个基于 `ascyn` 的异步Web服务器。 +在回忆了单线程版本该如何实现后,我们也将进入正题,一起来实现一个基于 `async` 的异步Web服务器。 ## 运行异步代码 -一个 Web 服务器必须要能并发的处理大量来自用户的请求,也就是我们不能在处理上一个用户的请求后,再处理下一个用户的请求。上面的单线程版本可以修改为多线程甚至于线程池来实现并发处理,但是线程还是太重了,使用 `async` 实现 `Web` 服务器才是最适合的。 +一个 Web 服务器必须要能并发的处理大量来自用户的请求,也就是我们不能在处理完上一个用户的请求后,再处理下一个用户的请求。上面的单线程版本可以修改为多线程甚至于线程池来实现并发处理,但是线程还是太重了,使用 `async` 实现 `Web` 服务器才是最适合的。 首先将 `handle_connection` 修改为 `async` 实现: ```rust @@ -148,7 +148,7 @@ async fn handle_connection(mut stream: TcpStream) { } ``` -上面是全新实现的 `handle_connection` ,它会在内部睡眠5秒,模拟一次用户慢请求,需要注意的是,我们并没有使用 `std::thread::sleep` 进行睡眠, 原因是该函数并不适用于并发代码,它会让当前线程陷入睡眠中,导致其它任务无法继续运行!因此我们需要一个睡眠函数 `async_std::task::sleep`,它仅会让当前的任务陷入睡眠,然后该任务会让出线程的控制权,这样线程就可以继续运行其它任务。 +上面是全新实现的 `handle_connection` ,它会在内部睡眠5秒,模拟一次用户慢请求,需要注意的是,我们并没有使用 `std::thread::sleep` 进行睡眠,原因是该函数是阻塞的,它会让当前线程陷入睡眠中,导致其它任务无法继续运行!因此我们需要一个睡眠函数 `async_std::task::sleep`,它仅会让当前的任务陷入睡眠,然后该任务会让出线程的控制权,这样线程就可以继续运行其它任务。 因此,光把函数变成 `async` 往往是不够的,还需要将它内部的代码也都变成异步兼容的,阻塞线程绝对是不可行的。 @@ -222,7 +222,7 @@ async fn main() { ## 测试 handle_connection 函数 对于测试 Web 服务器,使用集成测试往往是最简单的,但是在本例子中,将使用单元测试来测试连接处理函数的正确性。 -为了保证单元测试的隔离性和确定性,我们使用 `Mock` 来替代 `TcpStream` 。首先,修改 `handle_connection` 的函数签名让测试更简单,之所以可以修改签名,原因在于 `async_std::net::TcpStream` 实际上并不是必须的,只要任何结构体实现了 `async_std::io::Read`, `async_std::io::Write` 和 `marker::Unpin` 就可以替代它: +为了保证单元测试的隔离性和确定性,我们使用 `MockTcpStream` 来替代 `TcpStream` 。首先,修改 `handle_connection` 的函数签名让测试更简单,之所以可以修改签名,原因在于 `async_std::net::TcpStream` 实际上并不是必须的,只要任何结构体实现了 `async_std::io::Read`, `async_std::io::Write` 和 `marker::Unpin` 就可以替代它: ```rust use std::marker::Unpin; use async_std::io::{Read, Write}; diff --git a/book/contents/tokio/async.md b/book/contents/tokio/async.md index 347345e1..9f0e28f2 100644 --- a/book/contents/tokio/async.md +++ b/book/contents/tokio/async.md @@ -1,7 +1,7 @@ # 深入 Tokio 背后的异步原理 在经过多个章节的深入学习后,Tokio 对我们来说不再是一座隐于云雾中的高山,它其实蛮简单好用的,甚至还有一丝丝的可爱!? -但从现在开始,如果想要进一步的深入 Tokio ,首先需要深入理解 async 的原理,其实我们在[之前的章节](https://course.rs/async/intro.html)已经深入学习过,这里结合 Tokio 再来回顾下。 +但从现在开始,如果想要进一步的深入 Tokio ,首先需要深入理解 `async` 的原理,其实我们在[之前的章节](https://course.rs/async/intro.html)已经深入学习过,这里结合 Tokio 再来回顾下。 ## Future 先来回顾一下 `async fn` 异步函数 : @@ -118,7 +118,7 @@ use std::time::{Duration, Instant}; enum MainFuture { // 初始化,但永远不会被 poll State0, - // 等待 `Delay` 运行,例如 `future.awai` 代码行 + // 等待 `Delay` 运行,例如 `future.await` 代码行 State1(Delay), // Future 执行完成 Terminated, @@ -236,12 +236,12 @@ impl MiniTokio { 鉴于此,我们的 mini-tokio 只应该在 `Future` 准备好可以进一步运行后,才去 `poll` 它,例如该 `Future` 之前阻塞等待的**资源**已经准备好并可以被使用了,就可以对其进行 `poll`。再比如,如果一个 `Future` 任务在阻塞等待从 TCP socket 中读取数据,那我们只想在 `socket` 中有数据可以读取后才去 `poll` 它,而不是没事就 `poll` 着玩。 -回到在上面的代码中,mini-tokio 只应该当任务的延迟时间到了后,才去 `poll` 它。 为了实现这个功能,我们需要通知 -> 运行机制:当任务可以进一步被推进运行时,它会主动通知执行器,然后执行器再来 `poll`。 +回到在上面的代码中,mini-tokio 只应该当任务的延迟时间到了后,才去 `poll` 它。 为了实现这个功能,我们需要 `通知 -> 运行` 机制:当任务可以进一步被推进运行时,它会主动通知执行器,然后执行器再来 `poll`。 ## Waker 一切的答案都在 `Waker` 中,资源可以用它来通知正在等待的任务:该资源已经准备好,可以继续运行了。 -再来看下 `Future::pol` 的定义: +再来看下 `Future::poll` 的定义: ```rust fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll; @@ -327,7 +327,7 @@ impl Future for Delay { 在返回 `Poll::Pending` 之前,先调用了 `cx.waker().wake_by_ref()` ,由于此时我们还没有模拟计时资源,因此这里直接调用了 `wake` 进行通知,这样做会导致当前的 `Future` 被立即再次调度执行。 -由此可见,这种通知的控制权是在你手里的,甚至可以像上面代码这样,还没准备好资源,就直接进行 `wake` 通知,但是总归意义不大,而且浪费了 CPU,因为这种执行 -> 立即通知再调度 -> 执行的方式会造成一个非常繁忙的循环。 +由此可见,这种通知的控制权是在你手里的,甚至可以像上面代码这样,还没准备好资源,就直接进行 `wake` 通知,但是总归意义不大,而且浪费了 CPU,因为这种 `执行 -> 立即通知再调度 -> 执行` 的方式会造成一个非常繁忙的循环。 #### 处理 wake 通知 下面,让我们更新 mint-tokio 服务,让它能接受 wake 通知:当 `waker.wake()` 被调用后,相关联的任务会被放入执行器的队列中,然后等待执行器的调用执行。 @@ -423,7 +423,7 @@ impl MiniTokio { impl Task { fn poll(self: Arc) { - // 基于 Task 实例创建一个 waker, 它使用了之前的 `AarcWake` + // 基于 Task 实例创建一个 waker, 它使用了之前的 `ArcWake` let waker = task::waker(self.clone()); let mut cx = Context::from_waker(&waker); @@ -436,7 +436,7 @@ impl Task { // 使用给定的 future 来生成新的任务 // - // 新的任务会被推到 `sender` 中,接着该消息通道的接收段就可以获取该任务,然后执行 + // 新的任务会被推到 `sender` 中,接着该消息通道的接收端就可以获取该任务,然后执行 fn spawn(future: F, sender: &channel::Sender>) where F: Future + Send + 'static, @@ -492,14 +492,14 @@ async fn main() { 首先,`poll_fn` 函数使用闭包创建了一个 `Future`,其次,上面代码还创建一个 `Delay` 实例,然后在闭包中,对其进行了一次 `poll` ,接着再将该 `Delay` 实例发送到一个新的任务,在此任务中使用 `.await` 进行了执行。 -在例子中,`Delay:poll` 被调用了不止一次,且使用了不同的 `Waker` 实例,在这种场景下,你必须确保调用最近一次 `poll` 函数中的 `Waker` 参数中的`wake`方法。也就是调用最内层 `poll` 函数参数( `Waker` )上的 `wake` 方。 +在例子中,`Delay:poll` 被调用了不止一次,且使用了不同的 `Waker` 实例,在这种场景下,你必须确保调用最近一次 `poll` 函数中的 `Waker` 参数中的`wake`方法。也就是调用最内层 `poll` 函数参数( `Waker` )上的 `wake` 方法。 当实现一个 `Future` 时,很关键的一点就是要假设每次 `poll` 调用都会应用到一个不同的 `Waker` 实例上。因此 `poll` 函数必须要使用一个新的 `waker` 去更新替代之前的 `waker`。 我们之前的 `Delay` 实现中,会在每一次 `poll` 调用时都生成一个新的线程。这么做问题不大,但是当 `poll` 调用较多时会出现明显的性能问题!一个解决方法就是记录你是否已经生成了一个线程,然后只有在没有生成时才去创建一个新的线程。但是一旦这么做,就必须确保线程的 `Waker` 在后续 `poll` 调用中被正确更新,否则你无法唤醒最近的 `Waker` ! -这一段大家可能会看得云里雾里的,没办法,原文就饶老绕去,好在终于可以看代码了。。我们可以通过代码来解决疑惑: +这一段大家可能会看得云里雾里的,没办法,原文就饶来绕去,好在终于可以看代码了。。我们可以通过代码来解决疑惑: ```rust use std::future::Future; use std::pin::Pin; @@ -525,7 +525,8 @@ impl Future for Delay { let mut waker = waker.lock().unwrap(); // 检查之前存储的 `waker` 是否跟当前任务的 `waker` 相匹配. - // 这是必要的,原因是 `Delay Future` 的实例可能会在两次 `poll` 之间被转移到另一个任务中,然后 // 存储中的 waker 被该任务进行了更新。 + // 这是必要的,原因是 `Delay Future` 的实例可能会在两次 `poll` 之间被转移到另一个任务中,然后 + // 存储的 waker 被该任务进行了更新。 // 这种情况一旦发生,`Context` 包含的 `waker` 将不同于存储的 `waker`。 // 因此我们必须对存储的 `waker` 进行更新 if !waker.will_wake(cx.waker()) { @@ -550,7 +551,7 @@ impl Future for Delay { }); } - // 一旦 waker 被存储且计时器线程已经开始,我们就需要检查 `delay` 是否已尽完成 + // 一旦 waker 被存储且计时器线程已经开始,我们就需要检查 `delay` 是否已经完成 // 若计时已完成,则当前 Future 就可以完成并返回 `Poll::Ready` if Instant::now() >= self.when { Poll::Ready(()) @@ -603,7 +604,7 @@ async fn delay(dur: Duration) { ## 总结 在看完这么长的文章后,我们来总结下,否则大家可能还会遗忘: -- 在 Rust 中,`ascyn` 是惰性的,直到执行器 `poll` 它们时,才会开始执行 +- 在 Rust 中,`async` 是惰性的,直到执行器 `poll` 它们时,才会开始执行 - `Waker` 是 `Future` 被执行的关键,它可以链接起 `Future` 任务和执行器 - 当资源没有准备时,会返回一个 `Poll::Pending` - 当资源准备好时,会通过 `waker.wake` 发出通知 diff --git a/book/contents/tokio/channels.md b/book/contents/tokio/channels.md index 4ce55574..f246ddff 100644 --- a/book/contents/tokio/channels.md +++ b/book/contents/tokio/channels.md @@ -49,10 +49,10 @@ async fn main() { 在上一节中,我们介绍了几个解决方法,但是它们大部分都不太适用于此时的情况,例如: -- `std::sync::Mutex` 无法被使用,这个问题在之前章节有详解介绍过,同步锁无法在 `.await` 调用过程中使用 +- `std::sync::Mutex` 无法被使用,这个问题在之前章节有详解介绍过,同步锁无法跨越 `.await` 调用时使用 - 那么你可能会想,是不是可以使用 `tokio::sync:Mutex` ,答案是可以用,但是同时就只能运行一个请求。若客户端实现了 redis 的 [pipelining](https://redis.io/topics/pipelining), 那这个异步锁就会导致连接利用率不足 -这个不行,那个也不行,是不是没有办法解决了?还记得我们上一章节提到过几次的消息传递,但是一直没有看到它的庐山真面吗?现在可以来看看了。 +这个不行,那个也不行,是不是没有办法解决了?还记得我们上一章节提到过几次的消息传递,但是一直没有看到它的庐山真面目吗?现在可以来看看了。 ## 消息传递 之前章节我们提到可以创建一个专门的任务 `C1` (消费者 Consumer) 和通过消息传递来管理共享的资源,这里的共享资源就是 `client` 。若任务 `P1` (生产者 Producer) 想要发出 Redis 请求,首先需要发送信息给 `C1`,然后 `C1` 会发出请求给服务器,在获取到结果后,再将结果返回给 `P1`。 @@ -108,7 +108,7 @@ async fn main() { 一个任务可以通过此通道将命令发送给管理 redis 连接的任务,同时由于通道支持多个生产者,因此多个任务可以同时发送命令。创建该通道会返回一个发送和接收句柄,这两个句柄可以分别被使用,例如它们可以被移动到不同的任务中。 -通道的缓冲队列长度是 32,意味着如果消息发送的比接收的快,这些消息将被存储在缓冲队列中,一旦存满了 32 条消息,使用`send(...).await`的发送者会**进入睡眠**,直到缓冲队列可以放入信息的消息(被接收者消费了)。 +通道的缓冲队列长度是 32,意味着如果消息发送的比接收的快,这些消息将被存储在缓冲队列中,一旦存满了 32 条消息,使用`send(...).await`的发送者会**进入睡眠**,直到缓冲队列可以放入新的消息(被接收者消费了)。 ```rust use tokio::sync::mpsc; diff --git a/book/contents/tokio/frame.md b/book/contents/tokio/frame.md index 0486954d..2cf3c099 100644 --- a/book/contents/tokio/frame.md +++ b/book/contents/tokio/frame.md @@ -64,7 +64,7 @@ impl Connection { 关于Redis协议的说明,可以看看[官方文档](https://redis.io/topics/protocol),`Connection` 代码的完整实现见[这里](https://github.com/tokio-rs/mini-redis/blob/tutorial/src/connection.rs). -## 缓冲读取(Buffered Read) +## 缓冲读取(Buffered Reads) `read_frame` 方法会等到一个完整的帧都读取完毕后才返回,与之相比,它底层调用的`TcpStream::read` 只会返回任意多的数据(填满传入的缓冲区 buffer ),它可能返回帧的一部分、一个帧、多个帧,总之这种读取行为是不确定的。 当 `read_frame` 的底层调用 `TcpStream::read` 读取到部分帧时,会将数据先缓冲起来,接着继续等待并读取数据。如果读到多个帧,那第一个帧会被返回,然后剩下的数据依然被缓冲起来,等待下一次 `read_frame` 被调用。 @@ -194,7 +194,7 @@ pub async fn read_frame(&mut self) 一旦缓冲区满了,还需要增加缓冲区的长度,这样才能继续写入数据。还有一点值得注意,在 `parse_frame` 方法的内部实现中,也需要通过游标来解析数据: `self.buffer[..self.cursor]`,通过这种方式,我们可以准确获取到目前已经读取的全部数据。 -在网络编程中,通过字节数组和游标的方式读取数据是非常普遍的,因此 `bytes` 包提供了一个 `Buf` 特征,如果一个类型可以被读取数据,那么该类型需要实现 `Buf` 特征。与之对应,当一个类型可以被写入数据时,它需要实现 `ButMut` 。 +在网络编程中,通过字节数组和游标的方式读取数据是非常普遍的,因此 `bytes` 包提供了一个 `Buf` 特征,如果一个类型可以被读取数据,那么该类型需要实现 `Buf` 特征。与之对应,当一个类型可以被写入数据时,它需要实现 `BufMut` 。 当 `T: BufMut` ( 特征约束,说明类型 `T` 实现了 `BufMut` 特征 ) 被传给 `read_buf()` 方法时,缓冲区 `T` 的内部游标会自动进行更新。正因为如此,在使用了 `BufMut` 版本的 `read_frame` 中,我们并不需要管理自己的游标。 @@ -255,7 +255,7 @@ fn parse_frame(&mut self) 为了降低系统调用的次数,我们需要使用一个写入缓冲区,当写入一个帧时,首先会写入该缓冲区,然后等缓冲区数据足够多时,再集中将其中的数据写入到 socket 中,这样就将多次系统调用优化减少到一次。 -还有,缓冲区也不总是能提升性能。 例如,考虑一个 `bulk` 帧(多个帧放在一起组成一个bulk,通过批量发送提升效率),该帧的特点就是:由于由多个帧组合而成,因此帧体数据可能会很大。因此我们不能将其帧体数据写入到缓冲区中,因此数据较大时,先写入缓冲区再写入 socket 会有较大的性能开销(实际上缓冲区就是为了批量写入,既然 bulk 已经是批量了,因此不使用缓冲区也很正常)。 +还有,缓冲区也不总是能提升性能。 例如,考虑一个 `bulk` 帧(多个帧放在一起组成一个bulk,通过批量发送提升效率),该帧的特点就是:由于由多个帧组合而成,因此帧体数据可能会很大。所以我们不能将其帧体数据写入到缓冲区中,因为数据较大时,先写入缓冲区再写入 socket 会有较大的性能开销(实际上缓冲区就是为了批量写入,既然 bulk 已经是批量了,因此不使用缓冲区也很正常)。 为了实现缓冲写,我们将使用 [`BufWriter`](https://docs.rs/tokio/1/tokio/io/struct.BufWriter.html) 结构体。该结构体实现了 `AsyncWrite` 特征,当 `write` 方法被调用时,不会直接写入到 socket 中,而是先写入到缓冲区中。当缓冲区被填满时,其中的内容会自动刷到(写入到)内部的 socket 中,然后再将缓冲区清空。当然,其中还存在某些优化,通过这些优化可以绕过缓冲区直接访问 socket。 diff --git a/book/contents/tokio/getting-startted.md b/book/contents/tokio/getting-startted.md index ebe71fb5..5a998d73 100644 --- a/book/contents/tokio/getting-startted.md +++ b/book/contents/tokio/getting-startted.md @@ -95,7 +95,7 @@ Perfect, 代码成功运行,是时候来解释下其中蕴藏的至高奥秘 let mut client = client::connect("127.0.0.1:6379").await?; ``` -[`client::connect`](https://docs.rs/mini-redis/0.4.1/mini_redis/client/fn.connect.html) 函数由`mini-redis` 包提供,它使用异步的方式跟指定的远程 `IP` 地址建立 TCP 长连接,一旦连接建立成功,那 `clien` 的赋值初始化也将完成。 +[`client::connect`](https://docs.rs/mini-redis/0.4.1/mini_redis/client/fn.connect.html) 函数由`mini-redis` 包提供,它使用异步的方式跟指定的远程 `IP` 地址建立 TCP 长连接,一旦连接建立成功,那 `client` 的赋值初始化也将完成。 特别值得注意的是:虽然该连接是异步建立的,但是从代码本身来看,完全是**同步的代码编写方式**,唯一能说明异步的点就是 `.await`。 @@ -133,7 +133,7 @@ async fn say_to_world() -> String { #[tokio::main] async fn main() { - // 此处的函数调用是惰性的,并不会执行 `say_world()` 函数体中的代码 + // 此处的函数调用是惰性的,并不会执行 `say_to_world()` 函数体中的代码 let op = say_to_world(); // 首先打印出 "hello" @@ -184,7 +184,7 @@ fn main() { tokio = { version = "1", features = ["full"] } ``` -里面有个 `features = ["full"]` 可能大家会比较迷惑,当然,关于它的具体解释在本书的 `[Cargo详解专题]` 有介绍,这里就简单进行说明, +里面有个 `features = ["full"]` 可能大家会比较迷惑,当然,关于它的具体解释在本书的 [Cargo详解专题](https://course.rs/cargo/intro.html) 有介绍,这里就简单进行说明, `Tokio` 有很多功能和特性,例如 `TCP`,`UDP`,`Unix sockets`,同步工具,多调度类型等等,不是每个应用都需要所有的这些特性。为了优化编译时间和最终生成可执行文件大小、内存占用大小,应用可以对这些特性进行可选引入。 diff --git a/book/contents/tokio/io.md b/book/contents/tokio/io.md index c800dbf9..58cff8dd 100644 --- a/book/contents/tokio/io.md +++ b/book/contents/tokio/io.md @@ -7,10 +7,10 @@ Tokio 中的 I/O 操作和 `std` 在使用方式上几无区别,最大的区 - 还有数据结构也实现了它们:`Vec`、`&[u8]`,这样就可以直接使用这些数据结构作为读写器( reader / writer) ## AsyncRead 和 AsyncWrite -这两个特征为字节流的异步读写提供了便利的使用方法,这些方法都使用 `async` 声明,且需要通过 `.await` 进行调用, +这两个特征为字节流的异步读写提供了便利,通常我们会使用 `AsyncReadExt` 和 `AsyncWriteExt` 提供的工具方法,这些方法都使用 `async` 声明,且需要通过 `.await` 进行调用, #### async fn read - `read` 是一个异步方法可以将数据读入缓冲区( `buffer` )中,然后返回读取的字节数。 + `AsyncReadExt::read` 是一个异步方法可以将数据读入缓冲区( `buffer` )中,然后返回读取的字节数。 ```rust use tokio::fs::File; use tokio::io::{self, AsyncReadExt}; @@ -31,7 +31,7 @@ async fn main() -> io::Result<()> { 需要注意的是:当 `read` 返回 `Ok(0)` 时,意味着字节流( stream )已经关闭,在这之后继续调用 `read` 会立刻完成,依然获取到返回值 `Ok(0)`。 例如,字节流如果是 `TcpStream` 类型,那 `Ok(0)` 说明该**连接的读取端已经被关闭**(写入端关闭,会报其它的错误)。 #### async fn read_to_end -该方法会从字节流中读取所有的字节,直到遇到 `EOF` : +`AsyncReadExt::read_to_end` 方法会从字节流中读取所有的字节,直到遇到 `EOF` : ```rust use tokio::io::{self, AsyncReadExt}; use tokio::fs::File; @@ -48,7 +48,7 @@ async fn main() -> io::Result<()> { ``` #### async fn write -`write` 异步方法会尝试将缓冲区的内容写入到写入器( `writer` )中,同时返回写入的字节数: +`AsyncWriteExt::write` 异步方法会尝试将缓冲区的内容写入到写入器( `writer` )中,同时返回写入的字节数: ```rust use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; @@ -67,7 +67,7 @@ async fn main() -> io::Result<()> { 上面代码很清晰,但是大家可能会疑惑 `b"some bytes"` 是什么意思。这种写法可以将一个 `&str` 字符串转变成一个字节数组:`&[u8;10]`,然后 `write` 方法又会将这个 `&[u8;10]` 的数组类型隐式强转为数组切片: `&[u8]`。 #### async fn write_all -将缓冲区的内容全部写入到写入器中: +`AsyncWriteExt::write_all` 将缓冲区的内容全部写入到写入器中: ```rust use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; @@ -81,12 +81,12 @@ async fn main() -> io::Result<()> { } ``` -以上只是部分方法,实际上还有一些实用的方法由于篇幅有限无法列出,大家可以通过 API 文档查看完整的列表。 +以上只是部分方法,实际上还有一些实用的方法由于篇幅有限无法列出,大家可以通过 [API 文档](https://docs.rs/tokio/latest/tokio/io/index.html) 查看完整的列表。 ## 实用函数 另外,和标准库一样, `tokio::io` 模块包含了多个实用的函数或API,可以用于处理标准输入/输出/错误等。 -例如,`tokio::io::copy` 异步的将读取器( `reader` )中的内容拷贝到写入器中。 +例如,`tokio::io::copy` 异步的将读取器( `reader` )中的内容拷贝到写入器( `writer` )中。 ```rust use tokio::fs::File; use tokio::io; @@ -288,6 +288,10 @@ struct Task { 但是再怎么优化,任务的结构体至少也会跟其中的栈数组一样大,因此通常情况下,使用堆上的缓冲区会高效实用的多。 +> 当任务因为调度在线程间移动时,存储在栈上的数据需要进行保存和恢复,过大的栈上变量会带来不小的数据拷贝开销 +> +> 因此,存储大量数据的变量最好放到堆上 + ##### 处理EOF 当 TCP 连接的读取端关闭后,再调用 `read` 方法会返回 `Ok(0)`。此时,再继续下去已经没有意义,因此我们需要退出循环。忘记在 EOF 时退出读取循环,是网络编程中一个常见的 bug : ```rust diff --git a/book/contents/tokio/overview.md b/book/contents/tokio/overview.md index 3be6bd95..cafba591 100644 --- a/book/contents/tokio/overview.md +++ b/book/contents/tokio/overview.md @@ -30,7 +30,7 @@ Rust 语言本身只提供了异步编程所需的基本特性,例如 `async/. 如果你只用 `tokio` ,那兼容性自然不是问题,至于难以上手,Rust 这么难,我们都学到现在了,何况区区一个异步运行时,在本书的帮忙下,这些都不再是个问题:) ## tokio简介 -Tokio是一个纸醉金迷之地,只要有钱就可以为所欲为,哦,抱歉,走错片场了。`Tokio` 是 Rust 最优秀的异步运行时框架,它提供了写异步网络服务所需的几乎所有功能,不仅仅适用于大型服务器,还适用于小型嵌入式设备,它主要由以下组件构成: +tokio是一个纸醉金迷之地,只要有钱就可以为所欲为,哦,抱歉,走错片场了。`tokio` 是 Rust 最优秀的异步运行时框架,它提供了写异步网络服务所需的几乎所有功能,不仅仅适用于大型服务器,还适用于小型嵌入式设备,它主要由以下组件构成: - 多线程版本的异步运行时,可以运行使用 `async/.await` 编写的代码 - 标准库中阻塞API的异步版本,例如`thread::sleep`会阻塞当前线程,`tokio`中就提供了相应的异步实现版本 @@ -41,7 +41,7 @@ Tokio是一个纸醉金迷之地,只要有钱就可以为所欲为,哦,抱 **高性能** -因为快所以快,前者是 Rust 快,后者是 `tokio` 快。 `tokio` 在编写时充分利用了 Rust 提供的各种零抽象和高性能特性,而且贯彻了 Rust 的牛逼思想:如果你选择手写代码,那么最好的结果就是跟 `tokio` 一样快! +因为快所以快,前者是 Rust 快,后者是 `tokio` 快。 `tokio` 在编写时充分利用了 Rust 提供的各种零成本抽象和高性能特性,而且贯彻了 Rust 的牛逼思想:如果你选择手写代码,那么最好的结果就是跟 `tokio` 一样快! 以下是一张官方提供的性能参考图,大致能体现出 `tokio` 的性能之恐怖: tokio performance @@ -59,6 +59,7 @@ Rust 语言的安全可靠性顺理成章的影响了 `tokio` 的可靠性,曾 同时 `tokio` 遵循了标准库的命名规则,让熟悉标准库的用户可以很快习惯于 `tokio` 的语法,再借助于 Rust 强大的类型系统,用户可以轻松地编写和交付正确的代码。 **使用灵活性** + `tokio` 支持你灵活的定制自己想要的运行时,例如你可以选择多线程 + 任务盗取模式的复杂运行时,也可以选择单线程的轻量级运行时。总之,几乎你的每一种需求在 `tokio` 中都能寻找到支持(画外音:强大的灵活性需要一定的复杂性来换取,并不是免费的午餐)。 ### 劣势 diff --git a/book/contents/tokio/select.md b/book/contents/tokio/select.md index 1dc28a8f..9bbb27ea 100644 --- a/book/contents/tokio/select.md +++ b/book/contents/tokio/select.md @@ -54,7 +54,7 @@ async fn main() { let (tx2, rx2) = oneshot::channel(); tokio::spawn(async { - // 等待 `some_peration` 的完成 + // 等待 `some_operation` 的完成 // 或者处理 `oneshot` 的关闭通知 tokio::select! { val = some_operation() => { @@ -62,12 +62,10 @@ async fn main() { } _ = tx1.closed() => { // 收到了发送端发来的关闭信号 - // - // `select` 即将结束,此时,正在进行的 `some_operation()` 任务会被取消,任务自动完成, // tx1 被释放 + // `select` 即将结束,此时,正在进行的 `some_operation()` 任务会被取消,任务自动完成, + // tx1 被释放 } } - - }); tokio::spawn(async { @@ -85,10 +83,10 @@ async fn main() { } ``` -上面代码的重点就在于 `tx1.closed` 所在的分支,一旦发送端被关闭,那该分支就会被执行,然后 `select` 会退出,并清理掉还没执行的第一个分支 `val = some_operation()` ,这其中 `some_peration` 返回的 `Future` 也会被清理,根据之前的内容,`Future` 被清理那相应的任务会立即取消,因此 `some_oeration` 会被取消,不再执行。 +上面代码的重点就在于 `tx1.closed` 所在的分支,一旦发送端被关闭,那该分支就会被执行,然后 `select` 会退出,并清理掉还没执行的第一个分支 `val = some_operation()` ,这其中 `some_operation` 返回的 `Future` 也会被清理,根据之前的内容,`Future` 被清理那相应的任务会立即取消,因此 `some_operation` 会被取消,不再执行。 #### Future 的实现 -为了更好的理解 `select` 的工作原理,我们来看看如果使用 `Future`。当然,这里是一个简化版本,在实际中,`select!` 会包含一些额外的功能,例如一开始会随机选择一个分支进行 `poll`。 +为了更好的理解 `select` 的工作原理,我们来看看如果使用 `Future` 该如何实现。当然,这里是一个简化版本,在实际中,`select!` 会包含一些额外的功能,例如一开始会随机选择一个分支进行 `poll`。 ```rust use tokio::sync::oneshot; @@ -150,7 +148,7 @@ async fn main() { 当 `select` 宏开始执行后,所有的分支会开始并发的执行。当任何一个**表达式**完成时,会将结果跟**模式**进行匹配。若匹配成功,则剩下的表达式会被释放。 -最常用的**模式**就是用变量名去匹配表达式返回的值,然乎该变量就可以在**结果处理**环节使用。 +最常用的**模式**就是用变量名去匹配表达式返回的值,然后该变量就可以在**结果处理**环节使用。 如果当前的模式不能匹配,剩余的 `async` 表达式将继续并发的执行,直到下一个完成。 @@ -245,7 +243,7 @@ async fn main() { ## 错误传播 在 Rust 中使用 `?` 可以对错误进行传播,但是在 `select!` 中,`?` 如何工作取决于它是在分支中的 `async` 表达式使用还是在结果处理的代码中使用: -- 在分支中 `ascyn` 表达式使用会将该表达式的结果变成一个 `Result` +- 在分支中 `async` 表达式使用会将该表达式的结果变成一个 `Result` - 在结果处理中使用,会将错误直接传播到 `select!` 之外 ```rust @@ -321,7 +319,7 @@ async fn main() { 上面代码中,`rx` 通道关闭后,`recv()` 方法会返回一个 `None`,可以看到没有任何模式能够匹配这个 `None`,那为何不会报错?秘密就在于 `else` 上:当使用模式去匹配分支时,若之前的所有分支都无法被匹配,那 `else` 分支将被执行。 ## 借用 -当在 Tokio 中生成( spawan )任务时,其 async 语句块必须拥有其中数据的所有权。而 `select!` 并没有这个限制,它的每个分支表达式可以直接借用数据,然后进行并发操作。只要遵循 Rust 的借用规则,多个分支表达式就可以不可变的借用同一个数据,或者在一个表达式可变的借用某个数据。 +当在 Tokio 中生成( spawn )任务时,其 async 语句块必须拥有其中数据的所有权。而 `select!` 并没有这个限制,它的每个分支表达式可以直接借用数据,然后进行并发操作。只要遵循 Rust 的借用规则,多个分支表达式可以不可变的借用同一个数据,或者在一个表达式可变的借用某个数据。 来看个例子,在这里我们同时向两个 TCP 目标发送同样的数据: ```rust @@ -357,7 +355,7 @@ async fn race( 如果你把连接过程放在了结果处理中,那连接失败会直接从 `race` 函数中返回,而不是继续执行另一个分支中的连接! -还有一个非常重要的点,**借用规则在分支表达式和结果处理中存在很大的不同**。例如上面代码中,我们在两个分支表达式中分别对 `data` 做了不可变借用,这当然ok,但是若是两次可变借用,那编译器会立即进行报错。但是转折来了:当在结果处理中进行两次可变借用时,却不会报错,大家可以思考下为什么,提示下:思考下分支在执行成后会发生什么。 +还有一个非常重要的点,**借用规则在分支表达式和结果处理中存在很大的不同**。例如上面代码中,我们在两个分支表达式中分别对 `data` 做了不可变借用,这当然ok,但是若是两次可变借用,那编译器会立即进行报错。但是转折来了:当在结果处理中进行两次可变借用时,却不会报错,大家可以思考下为什么,提示下:思考下分支在执行完成后会发生什么? ```rust use tokio::sync::oneshot; @@ -540,7 +538,7 @@ async fn main() { 当第一次循环开始时, 第一个分支会立即完成,因为 `operation` 的参数是 `None`。当第一个分支执行完成时,`done` 会变成 `true`,此时第一个分支的条件将无法被满足,开始执行第二个分支。 -当第二个分支收到一个偶数时,`done` 会被修改为 `false`,且 `operation` 被设置了值。 此后再一次循环时,第一个分支会被执行,且 `oertation` 返回一个 `Some(2)`,因此会触发 `return` ,最终结束循环并返回。 +当第二个分支收到一个偶数时,`done` 会被修改为 `false`,且 `operation` 被设置了值。 此后再一次循环时,第一个分支会被执行,且 `operation` 返回一个 `Some(2)`,因此会触发 `return` ,最终结束循环并返回。 这段代码引入了一个新的语法: `if !done`,在解释之前,先看看去掉后会如何: ```console @@ -556,10 +554,10 @@ note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace 那大家肯定有疑问了,既然 `operation` 不能再被调用了,我们该如何在有偶数值时,再回到第一个分支对其进行调用呢?答案就是 `operation.set(action(Some(v)));`,该操作会重新使用新的参数设置 `operation`。 -## spawan 和 select! 的一些不同 +## spawn 和 select! 的一些不同 学到现在,相信大家对于 `tokio::spawn` 和 `select!` 已经非常熟悉,它们的共同点就是都可以并发的运行异步操作。 然而它们使用的策略大相径庭。 -`tokio::spawan` 函数会启动新的任务来运行一个异步操作,每个任务都是一个独立的对象可以单独被 Tokio 调度运行,因此两个不同的任务的调度都是独立进行的,甚至于它们可能会运行在两个不同的操作系统线程上。鉴于此,生成的任务和生成的线程有一个相同的限制:不允许对外部环境中的值进行借用。 +`tokio::spawn` 函数会启动新的任务来运行一个异步操作,每个任务都是一个独立的对象可以单独被 Tokio 调度运行,因此两个不同的任务的调度都是独立进行的,甚至于它们可能会运行在两个不同的操作系统线程上。鉴于此,生成的任务和生成的线程有一个相同的限制:不允许对外部环境中的值进行借用。 -而 `select!` 宏就不一样了,它在同一个任务中并发运行所有的分支,正是因为这样在同一个任务中,这些分支无法被同时运行。 `select!` 宏在单个任务中实现了多路复用的功能。 +而 `select!` 宏就不一样了,它在同一个任务中并发运行所有的分支。正是因为这样,在同一个任务中,这些分支无法被同时运行。 `select!` 宏在单个任务中实现了多路复用的功能。 diff --git a/book/contents/tokio/shared-state.md b/book/contents/tokio/shared-state.md index 37ba3f95..34bed3d5 100644 --- a/book/contents/tokio/shared-state.md +++ b/book/contents/tokio/shared-state.md @@ -1,5 +1,5 @@ # 共享状态 -上一章节中,咱们搭建了一个异步的 redis 服务器,并成功的提供了服务,但是其隐藏了了一个巨大的问题:状态(数据)无法在多个连接之间共享,下面一起来看看该如何解决。 +上一章节中,咱们搭建了一个异步的 redis 服务器,并成功的提供了服务,但是其隐藏了一个巨大的问题:状态(数据)无法在多个连接之间共享,下面一起来看看该如何解决。 ## 解决方法 好在 Tokio 十分强大,上面问题对应的解决方法也不止一种: @@ -24,7 +24,7 @@ bytes = "1" ## 初始化 HashMap 由于 `HashMap` 会在多个任务甚至多个线程间共享,再结合之前的选择,最终我们决定使用 `>>` 的方式对其进行包裹。 -但是,大家先来畅想一下使用它进行包括后的类型长什么样? 大概,可能,长这样:`Arc>>`,天哪噜,一不小心,你就遇到了 Rust 的阴暗面:类型大串烧。可以想象,如果要在代码中到处使用这样的类型,可读性会极速下降,因此我们需要一个[类型别名](https://course.rs/advance/custom-type.html#类型别名type-alias)( type alias )来简化下: +但是,大家先来畅想一下使用它进行包裹后的类型长什么样? 大概,可能,长这样:`Arc>>`,天哪噜,一不小心,你就遇到了 Rust 的阴暗面:类型大串烧。可以想象,如果要在代码中到处使用这样的类型,可读性会极速下降,因此我们需要一个[类型别名](https://course.rs/advance/custom-type.html#类型别名type-alias)( type alias )来简化下: ```rust use bytes::Bytes; use std::collections::HashMap; @@ -35,7 +35,7 @@ type Db = Arc>>; 此时,`Db` 就是一个类型别名,使用它就可以替代那一大串的东东,等下你就能看到功效。 -接着,我们需要在 `main` 函数中对 `HashMap` 进行初始化,然后使用 `Arc` 克隆一份它的所有权并将其传入到生成的异步任务中。事实上在 Tokio 中,这里的 `Arc` 被称之为 **handle**,或者更宽泛的说,`handle` 在 Tokio 中可以用来访问某个共享状态。 +接着,我们需要在 `main` 函数中对 `HashMap` 进行初始化,然后使用 `Arc` 克隆一份它的所有权并将其传入到生成的异步任务中。事实上在 Tokio 中,这里的 `Arc` 被称为 **handle**,或者更宽泛的说,`handle` 在 Tokio 中可以用来访问某个共享状态。 ```rust use tokio::net::TcpListener; @@ -66,11 +66,11 @@ async fn main() { #### 为何使用 `std::sync::Mutex` 上面代码还有一点非常重要,那就是我们使用了 `std::sync::Mutex` 来保护 `HashMap`,而不是使用 `tokio::sync::Mutex`。 -在使用 Tokio 编写异步代码时,一个常见的错误无条件地使用 `tokio::sync::Mutex` ,而真相是:Tokio 提供的异步锁只应该使用在 `.await`调用过程中,而且 `Tokio` 的 `Mutex` 实际上内部使用的也是 `std::sync::Mutex`。 +在使用 Tokio 编写异步代码时,一个常见的错误无条件地使用 `tokio::sync::Mutex` ,而真相是:Tokio 提供的异步锁只应该在跨多个 `.await`调用时使用,而且 Tokio 的 `Mutex` 实际上内部使用的也是 `std::sync::Mutex`。 多补充几句,在异步代码中,关于锁的使用有以下经验之谈: -- 锁如果在 `.await` 的过程中持有,应该使用 `Tokio` 提供的锁,原因是 `.await`的过程中锁可能在线程间转移,若使用标准库的同步锁存在死锁的可能性,例如某个任务刚获取完锁,还没使用完就因为 `.await` 让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁 +- 锁如果在多个 `.await` 过程中持有,应该使用 Tokio 提供的锁,原因是 `.await`的过程中锁可能在线程间转移,若使用标准库的同步锁存在死锁的可能性,例如某个任务刚获取完锁,还没使用完就因为 `.await` 让出了当前线程的所有权,结果下个任务又去获取了锁,造成死锁 - 锁竞争不多的情况下,使用 `std::sync::Mutex` - 锁竞争多,可以考虑使用三方库提供的性能更高的锁,例如 [`parking_lot::Mutex`](https://docs.rs/parking_lot/0.10.2/parking_lot/type.Mutex.html) @@ -112,11 +112,11 @@ async fn process(socket: TcpStream, db: Db) { ## 任务、线程和锁竞争 当竞争不多的时候,使用阻塞性的锁去保护共享数据是一个正确的选择。当一个锁竞争触发后,当前正在执行任务(请求锁)的线程会被阻塞,并等待锁被前一个使用者释放。这里的关键就是:**锁竞争不仅仅会导致当前的任务被阻塞,还会导致执行任务的线程被阻塞,因此该线程准备执行的其它任务也会因此被阻塞!** -默认情况下,`Tokio` 调度器使用了多线程模式,此时如果有大量的任务都需要访问同一个锁,那么锁竞争将变得激烈起来。当然,你也可以使用 [**current_thread**](https://docs.rs/tokio/1.15.0/tokio/runtime/index.html#current-thread-scheduler) 运行时设置,在该设置下会使用一个单线程的调度器(执行器),所有的任务都会创建并执行在当前线程上,因此不再会有锁竞争。 +默认情况下,Tokio 调度器使用了多线程模式,此时如果有大量的任务都需要访问同一个锁,那么锁竞争将变得激烈起来。当然,你也可以使用 [**current_thread**](https://docs.rs/tokio/1.15.0/tokio/runtime/index.html#current-thread-scheduler) 运行时设置,在该设置下会使用一个单线程的调度器(执行器),所有的任务都会创建并执行在当前线程上,因此不再会有锁竞争。 > current_thread 是一个轻量级、单线程的运行时,当任务数不多或连接数不多时是一个很好的选择。例如你想在一个异步客户端库的基础上提供给用户同步的API访问时,该模式就很适用 -当同步锁的竞争变成一个问题时,使用 `Tokio` 提供的异步锁几乎并不能帮你解决问题,此时可以考虑如下选项: +当同步锁的竞争变成一个问题时,使用 Tokio 提供的异步锁几乎并不能帮你解决问题,此时可以考虑如下选项: - 创建专门的任务并使用消息传递的方式来管理状态 - 将锁进行分片 @@ -158,7 +158,7 @@ async fn increment_and_do_stuff(mutex: &Mutex) { } // 锁在这里超出作用域 ``` -如果你要 `spawn` 一个任务来执行上面的函数话,会报错: +如果你要 `spawn` 一个任务来执行上面的函数的话,会报错: ```console error: future cannot be sent between threads safely --> src/lib.rs:13:5 diff --git a/book/contents/tokio/spawning.md b/book/contents/tokio/spawning.md index 148a1de5..7b84d434 100644 --- a/book/contents/tokio/spawning.md +++ b/book/contents/tokio/spawning.md @@ -109,7 +109,7 @@ async fn main() { 任务在 Tokio 中远比看上去要更轻量,例如创建一个任务仅仅需要一次64字节大小的内存分配。因此应用程序在生成任务上,完全不应该有任何心理负担,除非你在一台没那么好的机器上疯狂生成了几百万个任务。。。 #### `'static` 约束 -当使用 Tokio 创建一个任务时,该任务类型的生命周期必须时 `'static`。意味着,在任务中不能使用外部数据的引用: +当使用 Tokio 创建一个任务时,该任务类型的生命周期必须是 `'static`。意味着,在任务中不能使用外部数据的引用: ```rust use tokio::task; @@ -176,7 +176,7 @@ PS: 这一段内容,Tokio 的原文有些逻辑混乱,我已经尽可能的 > 一个关于 `'static` 生命周期的常见误解就是它将永远存活(跟整个程序活得一样久),实际上并不是这样的。同样的,一个变量是 `'static` 并不意味着它存在内存泄漏的可能性。 #### Send 约束 -`tokio::spawn` 生成的任务必须实现 `Send` 特征,因为 Tokio 调度器会将任务在线程间进行移动,当这些任务在 `.await` 执行过程中发生阻塞时。 +`tokio::spawn` 生成的任务必须实现 `Send` 特征,因为当这些任务在 `.await` 执行过程中发生阻塞时,Tokio 调度器会将任务在线程间移动。 **一个任务要实现 `Send` 特征,那它在 `.await` 调用的过程中所持有的全部数据都必须实现 `Send` 特征**。当 `.await` 调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。该流程能正确的工作,任务必须将`.await`之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。若这些状态实现了 `Send` 特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动。 diff --git a/book/contents/tokio/stream.md b/book/contents/tokio/stream.md index 12e9c072..9343e003 100644 --- a/book/contents/tokio/stream.md +++ b/book/contents/tokio/stream.md @@ -111,7 +111,7 @@ got = Ok(Message { channel: "numbers", content: b"6" }) 与迭代器类似,`stream` 也有适配器,例如一个 `stream` 适配器可以将一个 `stream` 转变成另一个 `stream` ,例如 `map`、`take` 和 `filter`。 -在之前的客户端中,`subscribe` 订阅一直持续下去,知道程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。 +在之前的客户端中,`subscribe` 订阅一直持续下去,直到程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。 ```rust let messages = subscriber .into_stream() @@ -228,7 +228,7 @@ impl Stream for Interval { ``` #### async-stream -手动实现 `Stream` 特征实际上是相当麻烦的事,然而不幸地是,Rust 语言的 `async/await` 语法目前还不能用于定义 `stream`,虽然相关的工作已经在进行中。 +手动实现 `Stream` 特征实际上是相当麻烦的事,不幸地是,Rust 语言的 `async/await` 语法目前还不能用于定义 `stream`,虽然相关的工作已经在进行中。 作为替代方案,[`async-stream`](https://docs.rs/async-stream/latest/async_stream/) 包提供了一个 `stream!` 宏,它可以将一个输入转换成 `stream`,使用这个包,上面的代码可以这样实现: ```rust