From f6bacdd13d7c0f685d4a3915e8f651037ed1e40a Mon Sep 17 00:00:00 2001 From: sunface Date: Thu, 3 Feb 2022 19:41:47 +0800 Subject: [PATCH 1/3] =?UTF-8?q?add=20tokio-=E5=90=8C=E6=AD=A5=E6=A1=A5?= =?UTF-8?q?=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- book/contents/SUMMARY.md | 2 + book/contents/memory/uninit.md | 2 + book/contents/test/benchmark.md | 2 + book/contents/tokio/bridging-with-sync.md | 289 ++++++++++++++++++++++ book/contents/tokio/graceful-shutdown.md | 94 +++++++ book/contents/web/intro.md | 4 + book/writing-material/posts/async.md | 2 - book/writing-material/posts/interview.md | 1 + 8 files changed, 394 insertions(+), 2 deletions(-) create mode 100644 book/contents/tokio/bridging-with-sync.md create mode 100644 book/contents/tokio/graceful-shutdown.md delete mode 100644 book/writing-material/posts/async.md create mode 100644 book/writing-material/posts/interview.md diff --git a/book/contents/SUMMARY.md b/book/contents/SUMMARY.md index 6e470e36..e037bcbd 100644 --- a/book/contents/SUMMARY.md +++ b/book/contents/SUMMARY.md @@ -111,6 +111,8 @@ - [深入async](tokio/async.md) - [select](tokio/select.md) - [类似迭代器的Stream](tokio/stream.md)) + - [优雅的关闭](tokio/graceful-shutdown.md) + - [异步跟同步共存](tokio/bridging-with-sync.md) - [Unsafe Rust todo](unsafe/intro.md) - [原生指针 todo](unsafe/raw-pointer.md) diff --git a/book/contents/memory/uninit.md b/book/contents/memory/uninit.md index 73d3ad92..732b777c 100644 --- a/book/contents/memory/uninit.md +++ b/book/contents/memory/uninit.md @@ -1 +1,3 @@ # 未初始化内存 + +https://lucumr.pocoo.org/2022/1/30/unsafe-rust/ \ No newline at end of file diff --git a/book/contents/test/benchmark.md b/book/contents/test/benchmark.md index e8648f79..0d15a378 100644 --- a/book/contents/test/benchmark.md +++ b/book/contents/test/benchmark.md @@ -1,5 +1,7 @@ # 性能测试 +https://www.reddit.com/r/rust/comments/sh8u72/why_my_rust_benchmarks_were_wrong_or_how_to/ + ## benchmark迷一般的性能结果 代码如下 diff --git a/book/contents/tokio/bridging-with-sync.md b/book/contents/tokio/bridging-with-sync.md new file mode 100644 index 00000000..326706d9 --- /dev/null +++ b/book/contents/tokio/bridging-with-sync.md @@ -0,0 +1,289 @@ +# 异步跟同步共存 +一些异步程序例如 tokio指南 章节中的绝大多数例子,它们整个程序都是异步的,包括程序入口 `main` 函数: +```rust +#[tokio::main] +async fn main() { + println!("Hello world"); +} +``` + +在一些场景中,你可能只想在异步程序中运行一小部分同步代码,这种需求可以考虑下 [`spawn_blocking`](https://docs.rs/tokio/1.16.1/tokio/task/fn.spawn_blocking.html)。 + +但是在很多场景中,我们只想让程序的某一个部分成为异步的,也许是因为同步代码更好实现,又或许是同步代码可读性、兼容性都更好。例如一个 `GUI` 应用可能想要让 `UI` 相关的代码在主线程中,然后通过另一个线程使用 `tokio` 的运行时来处理一些异步任务。 + +因此本章节的目标很纯粹:如何在同步代码中使用一小部分异步代码。 + +## `#[tokio::main]` 的展开 +在 Rust 中, `main` 函数不能是异步的,有同学肯定不愿意了,我们在之前章节..不对,就在开头,你还用到了 `async fn main` 的声明方式,怎么就不能异步了呢? + +其实,`#[tokio::main]` 该宏仅仅是提供语法糖,目的是让大家可以更简单、更一致的去写异步代码,它会将你写下的`async fn main` 函数替换为: +```rust +fn main() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + println!("Hello world"); + }) +} +``` + +注意到上面的 `block_on` 方法了嘛?在我们自己的同步代码中,可以使用它开启一个 `async/await` 世界。 + +## mini-redis的同步接口 +在下面,我们将一起构建一个同步的 `mini-redis` ,为了实现这一点,需要将 `Runtime` 对象存储起来,然后利用上面提到的 `block_on` 方法。 + + +首先,创建一个文件 `src/blocking_client.rs`,然后使用下面代码将异步的 `Clien` 结构体包裹起来: +```rust +use tokio::net::ToSocketAddrs; +use tokio::runtime::Runtime; + +pub use crate::client::Message; + +/// 建立到 redis 服务端的连接 +pub struct BlockingClient { + /// 之前实现的异步客户端 `Client` + inner: crate::client::Client, + + /// 一个 `current_thread` 模式的 `tokio` 运行时, + /// 使用阻塞的方式来执行异步客户端 `Client` 上的操作 + rt: Runtime, +} + +pub fn connect(addr: T) -> crate::Result { + // 构建一个 tokio 运行时: Runtime + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + // 使用运行时来调用异步的连接方法 + let inner = rt.block_on(crate::client::connect(addr))?; + + Ok(BlockingClient { inner, rt }) +} +``` + +在这里,我们使用了一个构造器函数用于在同步代码中执行异步的方法:使用 `Runtime` 上的 `block_on` 方法来执行一个异步方法并返回结果。 + +有一个很重要的点,就是我们还使用了 [`current_thread`](https://docs.rs/tokio/1.16.1/tokio/runtime/struct.Builder.html#method.new_current_thread) 模式的运行时。这个可不常见,原因是异步程序往往要利用多线程的威力来实现更高的吞吐性能,相对应的模式就是 [`multi_thread`](https://docs.rs/tokio/1.16.1/tokio/runtime/struct.Builder.html#method.new_multi_thread),该模式会生成多个运行在后台的线程,它们可以高效的实现多个任务的同时并行处理。 + +但是对于我们的使用场景来说,在同一时间点只需要做一件事,无需并行处理,多个线程并不能帮助到任何事情,因此 `current_thread` 此时成为了最佳的选择。 + +在构建 `Runtime` 的过程中还有一个 [`enable_all`](https://docs.rs/tokio/1.16.1/tokio/runtime/struct.Builder.html#method.enable_all) 方法调用,它可以开启 `Tokio` 运行时提供的 IO 和定时器服务。 + + +> 由于 `current_thread` 运行时并不生成新的线程,只是运行在已有的主线程上,因此只有当 `block_on` 被调用后,该运行时才能执行相应的操作。一旦 `block_on` 返回,那运行时上所有生成的任务将再次冻结,直到 `block_on` 的再次调用。 +> +> 如果这种模式不符合使用场景的需求,那大家还是需要用 `multi_thread` 运行时来代替。事实上,在 tokio 之前的章节中,我们默认使用的就是 `multi_thread` 模式。 + +```rust +use bytes::Bytes; +use std::time::Duration; + +impl BlockingClient { + pub fn get(&mut self, key: &str) -> crate::Result> { + self.rt.block_on(self.inner.get(key)) + } + + pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> { + self.rt.block_on(self.inner.set(key, value)) + } + + pub fn set_expires( + &mut self, + key: &str, + value: Bytes, + expiration: Duration, + ) -> crate::Result<()> { + self.rt.block_on(self.inner.set_expires(key, value, expiration)) + } + + pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result { + self.rt.block_on(self.inner.publish(channel, message)) + } +} +``` + +这代码看上去挺长,实际上很简单,通过 `block_on` 将异步形式的 `Client` 的法变成同步调用的形式。例如 `BlockingClient` 的 `get` 方法实际上是对内部的异步 `get` 方法的同步调用。 + +与上面的平平无奇相比,下面的代码将更有趣,因为它将 `Client` 转变成一个 `Subscriber` 对象: +```rust +/// 下面的客户端可以进入 pub/sub (发布/订阅) 模式 +/// +/// 一旦客户端订阅了某个消息通道,那就只能执行 pub/sub 相关的命令。 +/// 将`BlockingClient` 类型转换成 `BlockingSubscriber` 是为了防止非 `pub/sub` 方法被调用 +pub struct BlockingSubscriber { + /// 异步版本的 `Subscriber` + inner: crate::client::Subscriber, + + /// 一个 `current_thread` 模式的 `tokio` 运行时, + /// 使用阻塞的方式来执行异步客户端 `Client` 上的操作 + rt: Runtime, +} + +impl BlockingClient { + pub fn subscribe(self, channels: Vec) -> crate::Result { + let subscriber = self.rt.block_on(self.inner.subscribe(channels))?; + Ok(BlockingSubscriber { + inner: subscriber, + rt: self.rt, + }) + } +} + +impl BlockingSubscriber { + pub fn get_subscribed(&self) -> &[String] { + self.inner.get_subscribed() + } + + pub fn next_message(&mut self) -> crate::Result> { + self.rt.block_on(self.inner.next_message()) + } + + pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> { + self.rt.block_on(self.inner.subscribe(channels)) + } + + pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> { + self.rt.block_on(self.inner.unsubscribe(channels)) + } +} +``` + +由上可知,`subscribe` 方法会使用运行时将一个异步的 `Client` 转变成一个异步的 `Subscriber`,此外,`Subscriber` 结构体有一个非异步的方法 `get_subscribed`,对于这种方法,只需直接调用即可,而无需使用运行时。 + +## 其它方法 +上面介绍的是最简单的方法,但是,如果只有这一种, tokio 也不会成为今天这个大名鼎鼎的自己。 + +#### runtime.spawn +可以通过 `Runtime` 的 `spawn` 方法来创建一个基于该运行时的后台任务: +```rust +use tokio::runtime::Builder; +use tokio::time::{sleep, Duration}; + +fn main() { + let runtime = Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + let mut handles = Vec::with_capacity(10); + for i in 0..10 { + handles.push(runtime.spawn(my_bg_task(i))); + } + + // 在后台任务运行的同时做一些耗费时间的事情 + std::thread::sleep(Duration::from_millis(750)); + println!("Finished time-consuming task."); + + // 等待这些后台任务的完成 + for handle in handles { + // `spawn` 方法返回一个 `JoinHandle`,它是一个 `Future`,因此可以通过 `block_on` 来等待它完成 + runtime.block_on(handle).unwrap(); + } +} + +async fn my_bg_task(i: u64) { + let millis = 1000 - 50 * i; + println!("Task {} sleeping for {} ms.", i, millis); + + sleep(Duration::from_millis(millis)).await; + + println!("Task {} stopping.", i); +} +``` + +运行该程序,输出如下: +```console +Task 0 sleeping for 1000 ms. +Task 1 sleeping for 950 ms. +Task 2 sleeping for 900 ms. +Task 3 sleeping for 850 ms. +Task 4 sleeping for 800 ms. +Task 5 sleeping for 750 ms. +Task 6 sleeping for 700 ms. +Task 7 sleeping for 650 ms. +Task 8 sleeping for 600 ms. +Task 9 sleeping for 550 ms. +Task 9 stopping. +Task 8 stopping. +Task 7 stopping. +Task 6 stopping. +Finished time-consuming task. +Task 5 stopping. +Task 4 stopping. +Task 3 stopping. +Task 2 stopping. +Task 1 stopping. +Task 0 stopping. +``` + +在此例中,我们生成了10个后台任务在运行时中运行,然后等待它们的完成。作为一个例子,想象一下在图形渲染应用( GUI )中,有时候需要通过网络访问远程服务来获取一些数据,那上面的这种模式就非常适合,因为这些网络访问比较耗时,而且不会影响图形的主体渲染,因此可以在主线程中渲染图形,然后使用其它线程来运行 Tokio 的运行时,并通过该运行时使用异步的方式完成网络访问,最后将这些网络访问的结果发送到 GUI 进行数据渲染,例如一个进度条。 + +还有一点很重要,在本例子中只能使用 `multi_thread` 运行时。如果我们使用了 `current_thread`,你会发现主线程的耗时任务会在后台任务开始之前就完成了。因为在 `multi_thread` 模式下,生成的任务只会在 `block_on` 期间才执行。 + +在 `multi_thread` 模式下,我们并不需要通过 `block_on` 来触发任务的运行,这里是仅仅是用来阻塞并等待最终的结果。而除了通过 `block_on` 等待结果外,你还可以: + +- 使用消息传递的方式,例如 `tokio::sync::mpsc`,让异步任务将结果发送到主线程,然后主线程通过 `.recv`方法等待这些结果 +- 通过共享变量的方式,例如 `Mutex`,这种方式非常适合实现 GUI 的进度条: GUI 在每个渲染帧读取该变量即可。 + +#### 发送消息 +在同步代码中使用异步的另一个方法就是生成一个运行时,然后使用消息传递的方式跟它进行交互。这个方法虽然更啰嗦一些,但是相对于之前的两种方法更加灵活: +```rust +use tokio::runtime::Builder; +use tokio::sync::mpsc; + +pub struct Task { + name: String, + // 一些信息用于描述该任务 +} + +async fn handle_task(task: Task) { + println!("Got task {}", task.name); +} + +#[derive(Clone)] +pub struct TaskSpawner { + spawn: mpsc::Sender, +} + +impl TaskSpawner { + pub fn new() -> TaskSpawner { + // 创建一个消息通道用于通信 + let (send, mut recv) = mpsc::channel(16); + + let rt = Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + std::thread::spawn(move || { + rt.block_on(async move { + while let Some(task) = recv.recv().await { + tokio::spawn(handle_task(task)); + } + + // 一旦所有的发送端超出作用域被 drop 后,`.recv()` 方法会返回 None,同时 while 循环会退出,然后线程结束 + }); + }); + + TaskSpawner { + spawn: send, + } + } + + pub fn spawn_task(&self, task: Task) { + match self.spawn.blocking_send(task) { + Ok(()) => {}, + Err(_) => panic!("The shared runtime has shut down."), + } + } +} +``` + +为何说这种方法比较灵活呢?以上面代码为例,它可以在很多方面进行配置。例如,可以使用信号量 [`Semaphore`](https://docs.rs/tokio/1.16.1/tokio/sync/struct.Semaphore.html)来限制当前正在进行的任务数,或者你还可以使用一个消息通道将消息反向发送回任务生成器 `spawner`。 + +抛开细节,抽象来看,这是不是很像一个 Actor ? diff --git a/book/contents/tokio/graceful-shutdown.md b/book/contents/tokio/graceful-shutdown.md new file mode 100644 index 00000000..39475903 --- /dev/null +++ b/book/contents/tokio/graceful-shutdown.md @@ -0,0 +1,94 @@ +# 优雅的关闭 +如果你的服务是一个小说阅读网站,那大概率用不到优雅关闭的,简单粗暴的关闭服务器,然后用户再次请求时获取一个错误就是了。但如果是一个web服务或数据库服务呢?当前的连接很可能在做着重要的事情,一旦关闭会导致数据的丢失甚至错误,此时,我们就需要优雅的关闭(graceful shutdown)了。 + +要让一个异步应用优雅的关闭往往需要做到3点: + +- 找出合适的关闭时机 +- 通知程序的每一个子部分开始关闭 +- 在主线程等待各个部分的关闭结果 + +在本文的下面部分,我们一起来看看该如何做到这三点。如果想要进一步了解在真实项目中该如何使用,大家可以看看 mini-redis 的完整代码实现,特别是 [`src/server.rs`](https://lucumr.pocoo.org/2022/1/30/unsafe-rust/) 和 [`src/shutdown.rs`](https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs)。 + + +## 找出合适的关闭时机 +一般来说,何时关闭是取决于应用自身的,但是一个常用的关闭准则就是当应用收到来自于操作系统的关闭信号时。例如通过 `ctrl + c` 来关闭正在运行的命令行程序。 + +为了检测来自操作系统的关闭信号,`Tokio` 提供了一个 `tokio::signal::ctrl_c` 函数,它将一直睡眠直到收到对应的信号: +```rust +use tokio::signal; + +#[tokio::main] +async fn main() { + // ... spawn application as separate task ... + // 在一个单独的任务中处理应用逻辑 + + match signal::ctrl_c().await { + Ok(()) => {}, + Err(err) => { + eprintln!("Unable to listen for shutdown signal: {}", err); + }, + } + + // 发送关闭信号给应用所在的任务,然后等待 +} +``` + +## 通知程序的每一个部分开始关闭 +大家看到这个标题,不知道会想到用什么技术来解决问题,反正我首先想到的是,真的很像广播哎。。 + +事实上也是如此,最常见的通知程序各个部分关闭的方式就是使用一个广播消息通道。关于如何实现,其实也不复杂:应用中的每个任务都持有一个广播消息通道的接收端,当消息被广播到该通道时,每个任务都可以收到该消息,并关闭自己: +```rust +let next_frame = tokio::select! { + res = self.connection.read_frame() => res?, + _ = self.shutdown.recv() => { + // 当收到关闭信号后,直接从 `select!` 返回,此时 `select!` 中的另一个分支会自动释放,其中的任务也会结束 + return Ok(()); + } +}; +``` + + +在 `mini-redis` 中,当收到关闭消息时,任务会立即结束,但在实际项目中,这种方式可能会过于理想,例如当我们向文件或数据库写入数据时,立刻终止任务可能会导致一些无法预料的错误,因此,在结束前做一些收尾工作会是非常好的选择。 + +除此之外,还有两点值得注意: + +- 将广播消息通道作为结构体的一个字段是相当不错的选择, 例如[这个例子](https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs) +- 还可以使用 [`watch channel`](https://docs.rs/tokio/1.16.1/tokio/sync/watch/index.html) 实现同样的效果,与之前的方式相比,这两种方法并没有太大的区别 + +## 等待各个部分的结束 +在之前章节,我们讲到过一个 [`mpsc`](https://docs.rs/tokio/1/tokio/sync/mpsc/index.html) 消息通道有一个重要特性:当所有发送端都 `drop` 时,消息通道会自动关闭,此时继续接收消息就会报错。 + +大家发现没?这个特性特别适合优雅关闭的场景:主线程持有消息通道的接收端,然后每个代码部分拿走一个发送端,当该部分结束时,就 `drop` 掉发送端,因此所有发送端被 `drop` 也就意味着所有的部分都已关闭,此时主线程的接收端就会收到错误,进而结束。 + +```rust +use tokio::sync::mpsc::{channel, Sender}; +use tokio::time::{sleep, Duration}; + +#[tokio::main] +async fn main() { + let (send, mut recv) = channel(1); + + for i in 0..10 { + tokio::spawn(some_operation(i, send.clone())); + } + + // 等待各个任务的完成 + // + // 我们需要 drop 自己的发送端,因为等下的 `recv()` 调用会阻塞, 如果不 `drop` ,那发送端就无法被全部关闭 + // `recv` 也将永远无法结束,这将陷入一个类似死锁的困境 + drop(send); + + // 当所有发送端都超出作用域被 `drop` 时 (当前的发送端并不是因为超出作用域被 `drop` 而是手动 `drop` 的) + // `recv` 调用会返回一个错误 + let _ = recv.recv().await; +} + +async fn some_operation(i: u64, _sender: Sender<()>) { + sleep(Duration::from_millis(100 * i)).await; + println!("Task {} shutting down.", i); + + // 发送端超出作用域,然后被 `drop` +} +``` + +关于忘记 `drop` 本身持有的发送端进而导致 bug 的问题,大家可以看看[这篇文章](https://course.rs/pitfalls/main-with-channel-blocked.html)。 diff --git a/book/contents/web/intro.md b/book/contents/web/intro.md index 9ba6d1df..efe777ac 100644 --- a/book/contents/web/intro.md +++ b/book/contents/web/intro.md @@ -1 +1,5 @@ # Web应用开发 todo + +https://www.reddit.com/r/rust/comments/sgfvn0/is_rust_any_good_for_building_restful_apis/ + +https://www.reddit.com/r/rust/comments/si7civ/do_you_think_rust_is_a_good_fit_for_a_simple_crud/ \ No newline at end of file diff --git a/book/writing-material/posts/async.md b/book/writing-material/posts/async.md deleted file mode 100644 index 4f3bd404..00000000 --- a/book/writing-material/posts/async.md +++ /dev/null @@ -1,2 +0,0 @@ -## Pin、UnPin -1. https://blog.cloudflare.com/pin-and-unpin-in-rust/ diff --git a/book/writing-material/posts/interview.md b/book/writing-material/posts/interview.md new file mode 100644 index 00000000..19d185c9 --- /dev/null +++ b/book/writing-material/posts/interview.md @@ -0,0 +1 @@ +https://www.reddit.com/r/rust/comments/si0j6v/im_preparing_for_a_rust_interview/ \ No newline at end of file From f55952fbf99d3d0a337325b56b3d1a63fc39e3c2 Mon Sep 17 00:00:00 2001 From: sunface Date: Thu, 3 Feb 2022 19:42:15 +0800 Subject: [PATCH 2/3] update readme.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7ad83f5f..bd5943e9 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ - 国内镜像: https://book.rust.team - 知乎: [支持章节内目录跳转,很好用!](https://www.zhihu.com/column/c_1452781034895446017) -- 最近修订: 新增章节 [Tokio使用指南 - 类似迭代器的Stream](https://zhuanlan.zhihu.com/p/463061975) +- 最近修订: 新增章节 [Tokio使用指南 - 同步桥接](https://zhuanlan.zhihu.com/p/463813904) - Rust版本: Rust edition 2021 - QQ交流群:1009730433 From 576743af252ce6d3cd677aa9336d3f6f83899473 Mon Sep 17 00:00:00 2001 From: sunface Date: Thu, 3 Feb 2022 19:42:55 +0800 Subject: [PATCH 3/3] update toc --- book/contents/SUMMARY.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/book/contents/SUMMARY.md b/book/contents/SUMMARY.md index e037bcbd..fd5565a5 100644 --- a/book/contents/SUMMARY.md +++ b/book/contents/SUMMARY.md @@ -83,7 +83,7 @@ - [基于Send和Sync的线程安全](advance/concurrency-with-threads/send-sync.md) - [实践应用:多线程Web服务器 todo](advance/concurrency-with-threads/web-server.md) - [全局变量](advance/global-variable.md) - - [错误处理 todo](advance/errors/intro.md) + - [错误处理 doing](advance/errors/intro.md) - [简化错误处理 todo](advance/errors/simplify.md) - [自定义错误 todo](advance/errors/user-define.md) - [让错误输出更优雅 todo](advance/errors/pretty-format.md) @@ -100,7 +100,7 @@ - [一些疑难问题的解决办法](async/pain-points-and-workarounds.md) - [实践应用:Async Web服务器](async/web-server.md) -- [tokio使用指南 doing](tokio/intro.md) +- [tokio使用指南](tokio/intro.md) - [tokio概览](tokio/overview.md) - [使用初印象](tokio/getting-startted.md) - [创建异步任务](tokio/spawning.md)