Update(async): unified format 4

pull/509/head
Allan Downey 3 years ago
parent fa45f219a7
commit d939b0460d

@ -1,9 +1,11 @@
# async/await 和 Stream流处理
# async/await 和 Stream 流处理
在入门章节中,我们简单学习了该如何使用 `async/.await` 同时在后面也了解了一些底层原理,现在是时候继续深入了。
`async/.await`是 Rust 语法的一部分,它在遇到阻塞操作时( 例如IO )会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
`async/.await`是 Rust 语法的一部分,它在遇到阻塞操作时( 例如 IO )会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
有两种方式可以使用`async` `async fn`用于声明函数,`async { ... }`用于声明语句块,它们会返回一个实现 `Future` 特征的值:
```rust
// `foo()`返回一个`Future<Output = u8>`,
// 当调用`foo().await`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值
@ -25,6 +27,7 @@ fn bar() -> impl Future<Output = u8> {
## `async` 的生命周期
`async fn` 函数如果拥有引用类型的参数,那它返回的 `Future` 的生命周期就会被这些参数的生命周期所限制:
```rust
async fn foo(x: &u8) -> u8 { *x }
@ -37,6 +40,7 @@ fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
意味着 `async fn` 函数返回的 `Future` 必须满足以下条件: 当 `x` 依然有效时, 该 `Future` 就必须继续等待( `.await` ), 也就是说`x` 必须比 `Future`活得更久。
在一般情况下,在函数调用后就立即 `.await` 不会存在任何问题,例如`foo(&x).await`。但是,若 `Future` 被先存起来或发送到另一个任务或者线程,就可能存在问题了:
```rust
use std::future::Future;
fn bad() -> impl Future<Output = u8> {
@ -48,6 +52,7 @@ async fn borrow_x(x: &u8) -> u8 { *x }
```
以上代码会报错,因为 `x` 的生命周期只到 `bad` 函数的结尾。 但是 `Future` 显然会活得更久:
```shell
error[E0597]: `x` does not live long enough
--> src/main.rs:4:14
@ -62,6 +67,7 @@ error[E0597]: `x` does not live long enough
```
其中一个常用的解决方法就是将具有引用参数的 `async fn` 函数转变成一个具有 `'static` 生命周期的 `Future` 。 以上解决方法可以通过将参数和对 `async fn` 的调用放在同一个 `async` 语句块来实现:
```rust
use std::future::Future;
@ -78,7 +84,9 @@ fn good() -> impl Future<Output = u8> {
如上所示,通过将参数移动到 `async` 语句块内, 我们将它的生命周期扩展到 `'static` 并跟返回的 `Future` 保持了一致。
## async move
`async` 允许我们使用 `move` 关键字来将环境中变量的所有权转移到语句块内,就像闭包那样,好处是你不再发愁该如何解决借用生命周期的问题,坏处就是无法跟其它代码实现对变量的共享:
```rust
// 多个不同的 `async` 语句块可以访问同一个本地变量,只要它们在该变量的作用域内执行
async fn blocks() {
@ -111,7 +119,8 @@ fn move_block() -> impl Future<Output = ()> {
}
```
## 当.await遇见多线程执行器
## 当.await 遇见多线程执行器
需要注意的是,当使用多线程 `Future` 执行器( `executor` )时, `Future` 可能会在线程间被移动,因此 `async` 语句块中的变量必须要能在线程间传递。 至于 `Future` 会在线程间移动的原因是:它内部的任何`.await`都可能导致它被切换到一个新线程上去执行。
由于需要在多线程环境使用,意味着 `Rc``RefCell` 、没有实现 `Send` 的所有权类型、没有实现 `Sync` 的引用类型,它们都是不安全的,因此无法被使用
@ -122,9 +131,10 @@ fn move_block() -> impl Future<Output = ()> {
因此,为了避免这种情况的发生,我们需要使用 `futures` 包下的锁 `futures::lock` 来替代 `Mutex` 完成任务。
## Stream 流处理
## Stream流处理
`Stream` 特征类似于 `Future` 特征,但是前者在完成前可以生成多个值,这种行为跟标准库中的 `Iterator` 特征倒是颇为相似。
```rust
trait Stream {
// Stream生成的值的类型
@ -138,6 +148,7 @@ trait Stream {
```
关于 `Stream` 的一个常见例子是消息通道( `futures` 包中的)的消费者 `Receiver`。每次有消息从 `Send` 端发送后,它都可以接收到一个 `Some(val)` 值, 一旦 `Send` 端关闭(drop),且消息通道中没有消息后,它会接收到一个 `None` 值。
```rust
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
@ -155,11 +166,12 @@ async fn send_recv() {
}
```
#### 迭代和并发
跟迭代器类似,我们也可以迭代一个 `Stream`。 例如使用`map``filter``fold`方法,以及它们的*遇到错误提前返回*的版本: `try_map``try_filter``try_fold`。
但是跟迭代器又有所不同,`for` 循环无法在这里使用,但是命令式风格的循环`while let`是可以用的,同时还可以使用`next` 和 `try_next` 方法:
```rust
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // 引入 next
@ -182,7 +194,8 @@ async fn sum_with_try_next(
}
```
上面代码是一次处理一个值的模式,但是需要注意的是:**如果你选择一次处理一个值的模式,可能会造成无法并发,这就失去了异步编程的意义**。 因此,如果可以的话我们还是要选择从一个 `Stream` 并发处理多个值的方式,通过 `for_each_concurrent``try_for_each_concurrent` 方法来实现:
上面代码是一次处理一个值的模式,但是需要注意的是:**如果你选择一次处理一个值的模式,可能会造成无法并发,这就失去了异步编程的意义**。 因此,如果可以的话我们还是要选择从一个 `Stream` 并发处理多个值的方式,通过 `for_each_concurrent``try_for_each_concurrent` 方法来实现:
```rust
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
@ -199,4 +212,3 @@ async fn jump_around(
Ok(())
}
```

@ -1,12 +1,15 @@
# 底层探秘: Future执行器与任务调度
异步编程背后到底藏有什么秘密究竟是哪只幕后之手在操纵这一切如果你对这些感兴趣就继续看下去否则可以直接跳过因为本章节的内容对于一个API工程师并没有太多帮助。
# 底层探秘: Future 执行器与任务调度
异步编程背后到底藏有什么秘密?究竟是哪只幕后之手在操纵这一切?如果你对这些感兴趣,就继续看下去,否则可以直接跳过,因为本章节的内容对于一个 API 工程师并没有太多帮助。
但是如果你希望能深入理解 `Rust``async/.await` 代码是如何工作、理解运行时和性能,甚至未来想要构建自己的 `async` 运行时或相关工具,那么本章节终究不会辜负于你。
## Future 特征
`Future` 特征是 Rust 异步编程的核心,毕竟异步函数是异步编程的核心,而 `Future` 恰恰是异步函数的返回值和被执行的关键。
首先,来给出 `Future` 的定义:它是一个能产出值的异步计算(虽然该值可能为空,例如 `()` )。光看这个定义,可能会觉得很空洞,我们来看看一个简化版的 `Future` 特征:
```rust
trait SimpleFuture {
type Output;
@ -27,7 +30,8 @@ enum Poll<T> {
也许大家还是迷迷糊糊的,没事,我们用一个例子来说明下。考虑一个需要从 `socket` 读取数据的场景:如果有数据,可以直接读取数据并返回 `Poll::Ready(data)` 但如果没有数据,`Future` 会被阻塞且不会再继续执行,此时它会注册一个 `wake` 函数,当 `socket` 数据准备好时,该函数将被调用以通知执行器:我们的 `Future` 已经准备好了,可以继续执行。
下面的 `SocketRead` 结构体就是一个 `Future`:
下面的 `SocketRead` 结构体就是一个 `Future`:
```rust
pub struct SocketRead<'a> {
socket: &'a Socket,
@ -42,7 +46,7 @@ impl SimpleFuture for SocketRead<'_> {
Poll::Ready(self.socket.read_buf())
} else {
// socket中还没数据
//
//
// 注册一个`wake`函数,当数据可用时,该函数会被调用,
// 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
self.socket.set_readable_callback(wake);
@ -50,9 +54,10 @@ impl SimpleFuture for SocketRead<'_> {
}
}
}
```
```
这种 `Future` 模型允许将多个异步操作组合在一起,同时还无需任何内存分配。不仅仅如此,如果你需要同时运行多个 `Future`或链式调用多个 `Future` ,也可以通过无内存分配的状态机实现,例如:
```rust
trait SimpleFuture {
type Output;
@ -108,6 +113,7 @@ where
```
上面代码展示了如何同时运行多个 `Future` 且在此过程中没有任何内存分配,让并发编程更加高效。 类似的,多个`Future`也可以一个接一个的连续运行:
```rust
/// 一个SimpleFuture, 它使用顺序的方式一个接一个地运行两个Future
//
@ -141,6 +147,7 @@ where
```
这些例子展示了在不需要内存对象分配以及深层嵌套回调的情况下,该如何使用 `Future` 特征去表达异步控制流。 在了解了基础的控制流后,我们再来看看真实的 `Future` 特征有何不同之处。
```rust
trait Future {
type Output;
@ -155,20 +162,22 @@ trait Future {
首先这里多了一个 `Pin` ,关于它我们会在后面章节详细介绍,现在你只需要知道使用它可以创建一个无法被移动的 `Future` ,因为无法被移动,因此它将具有固定的内存地址,意味着我们可以存储它的指针(如果内存地址可能会变动,那存储指针地址将毫无意义!),也意味着可以实现一个自引用数据结构: `struct MyFut { a: i32, ptr_to_a: *const i32 }`。 而对于 `async/await` 来说,`Pin` 是不可或缺的关键特性。
其次,从 `wake: fn()` 变成了 `&mut Context<'_>` 。意味着 `wake` 函数可以携带数据了为何要携带数据考虑一个真实世界的场景一个复杂应用例如web服务器可能有数千连接同时在线那么同时就有数千 `Future` 在被同时管理着,如果不能携带数据,当一个 `Future` 调用 `wake` 后,执行器该如何知道是哪个 `Future` 调用了 `wake` ,然后进一步去 `poll` 对应的 `Future` ?没有办法!那之前的例子为啥就可以使用没有携带数据的 `wake` 因为足够简单,不存在歧义性。
其次,从 `wake: fn()` 变成了 `&mut Context<'_>` 。意味着 `wake` 函数可以携带数据了,为何要携带数据?考虑一个真实世界的场景,一个复杂应用例如 web 服务器可能有数千连接同时在线,那么同时就有数千 `Future` 在被同时管理着,如果不能携带数据,当一个 `Future` 调用 `wake` 后,执行器该如何知道是哪个 `Future` 调用了 `wake` ,然后进一步去 `poll` 对应的 `Future` ?没有办法!那之前的例子为啥就可以使用没有携带数据的 `wake` 因为足够简单,不存在歧义性。
总之,在正式场景要进行 `wake` ,就必须携带上数据。 而 `Context` 类型通过提供一个 `Waker` 类型的值,就可以用来唤醒特定的的任务。
## 使用 Waker 来唤醒任务
对于 `Future` 来说,第一次被 `poll` 时无法完成任务是很正常的。但它需要确保在未来一旦准备好时,可以通知执行器再次对其进行 `poll` 进而继续往下执行,该通知就是通过 `Waker` 类型完成的。
`Waker` 提供了一个 `wake()` 方法可以用于告诉执行器:相关的任务可以被唤醒了,此时执行器就可以对相应的 `Future` 再次进行 `poll` 操作。
#### 构建一个定时器
下面一起来实现一个简单的定时器 `Future` 。为了让例子尽量简单,当计时器创建时,我们会启动一个线程接着让该线程进入睡眠,等睡眠结束后再通知给 `Future`
注意本例子还会在后面继续使用,因此我们重新创建一个工程来演示:使用 `cargo new --lib timer_future` 来创建一个新工程,在 `lib` 包的根路径 `src/lib.rs` 中添加以下内容:
```rust
use std::{
future::Future,
@ -180,7 +189,7 @@ use std::{
};
```
继续来实现 `Future` 定时器,之前提到: 新建线程在睡眠结束后会需要将状态同步给定时器 `Future` ,由于是多线程环境,我们需要使用 `Arc<Mutex<T>>` 来作为一个共享状态,用于在新线程和 `Future` 定时器间共享。
继续来实现 `Future` 定时器,之前提到: 新建线程在睡眠结束后会需要将状态同步给定时器 `Future` ,由于是多线程环境,我们需要使用 `Arc<Mutex<T>>` 来作为一个共享状态,用于在新线程和 `Future` 定时器间共享。
```rust
pub struct TimerFuture {
@ -198,6 +207,7 @@ struct SharedState {
```
下面给出 `Future` 的具体实现:
```rust
impl Future for TimerFuture {
type Output = ();
@ -207,8 +217,8 @@ impl Future for TimerFuture {
if shared_state.completed {
Poll::Ready(())
} else {
// 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
//
// 设置`waker`,这样新线程在睡眠(计时)结束后可以唤醒当前的任务,接着再次对`Future`进行`poll`操作,
//
// 下面的`clone`每次被`poll`时都会发生一次,实际上,应该是只`clone`一次更加合理。
// 选择每次都`clone`的原因是: `TimerFuture`可以在执行器的不同任务间移动,如果只克隆一次,
// 那么获取到的`waker`可能已经被篡改并指向了其它任务,最终导致执行器运行了错误的任务
@ -221,7 +231,8 @@ impl Future for TimerFuture {
代码很简单,只要新线程设置了 `shared_state.completed = true` ,那任务就能顺利结束。如果没有设置,会为当前的任务克隆一份 `Waker` ,这样新线程就可以使用它来唤醒当前的任务。
最后,再来创建一个 API 用于构建定时器和启动计时线程:
最后,再来创建一个 API 用于构建定时器和启动计时线程:
```rust
impl TimerFuture {
/// 创建一个新的`TimerFuture`,在指定的时间结束后,该`Future`可以完成
@ -251,19 +262,23 @@ impl TimerFuture {
至此,一个简单的定时器 `Future` 就已创建成功,那么该如何使用它呢?相信部分爱动脑筋的读者已经猜到了:我们需要创建一个执行器,才能让程序动起来。
## 执行器Executor
Rust的 `Future` 是惰性的:只有屁股上拍一拍,它才会努力动一动。其中一个推动它的方式就是在 `async` 函数中使用 `.await` 来调用另一个 `async` 函数,但是这个只能解决 `async` 内部的问题,那么这些最外层的 `async` 函数,谁来推动它们运行呢?答案就是我们之前多次提到的执行器 `executor`
## 执行器 Executor
Rust 的 `Future` 是惰性的:只有屁股上拍一拍,它才会努力动一动。其中一个推动它的方式就是在 `async` 函数中使用 `.await` 来调用另一个 `async` 函数,但是这个只能解决 `async` 内部的问题,那么这些最外层的 `async` 函数,谁来推动它们运行呢?答案就是我们之前多次提到的执行器 `executor`
执行器会管理一批 `Future` (最外层的 `ascyn` 函数),然后通过不停地 `poll` 推动它们直到完成。 最开始,执行器会先 `poll` 一次 `Future` ,后面就不会主动去 `poll` 了,而是等待 `Future` 通过调用 `wake` 函数来通知它可以继续,它才会继续去 `poll` 。这种**wake 通知然后 poll**的方式会不断重复,直到 `Future` 完成。
#### 构建执行器
下面我们将实现一个简单的执行器,它可以同时并发运行多个 `Future` 。例子中,需要用到 `futures` 包的 `ArcWake` 特征,它可以提供一个方便的途径去构建一个 `Waker` 。编辑 `Cargo.toml` ,添加下面依赖:
```rust
[dependencies]
futures = "0.3"
```
在之前的内容中,我们在 `src/lib.rs` 中创建了定时器 `Future` ,现在在 `src/main.rs` 中来创建程序的主体内容,开始之前,先引入所需的包:
```rust
use {
futures::{
@ -299,7 +314,7 @@ struct Spawner {
/// 一个Future它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
/// 进行中的Future在未来的某个时间点会被完成
///
///
/// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
/// Rust并不聪明它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
/// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
@ -321,6 +336,7 @@ fn new_executor_and_spawner() -> (Executor, Spawner) {
```
下面再来添加一个方法用于生成 `Future` , 然后将它放入任务通道中:
```rust
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
@ -335,6 +351,7 @@ impl Spawner {
```
在执行器 `poll` 一个 `Future` 之前,首先需要调用 `wake` 方法进行唤醒,然后再由 `Waker` 负责调度该任务并将其放入任务通道中。创建 `Waker` 的最简单的方式就是实现 `ArcWake` 特征,先来为我们的任务实现 `ArcWake` 特征,这样它们就能被转变成 `Waker` 然后被唤醒:
```rust
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
@ -349,6 +366,7 @@ impl ArcWake for Task {
```
当任务实现了 `ArcWake` 特征后,它就变成了 `Waker` ,在调用 `wake()` 对其唤醒后会将任务复制一份所有权( `Arc` ),然后将其发送到任务通道中。最后我们的执行器将从通道中获取任务,然后进行 `poll` 执行:
```rust
impl Executor {
fn run(&self) {
@ -372,6 +390,7 @@ impl Executor {
```
恭喜!我们终于拥有了自己的执行器,下面再来写一段代码使用该执行器去运行之前的定时器 `Future`
```rust
fn main() {
let (executor, spawner) = new_executor_and_spawner();
@ -393,8 +412,10 @@ fn main() {
}
```
## 执行器和系统IO
## 执行器和系统 IO
前面我们一起看过一个使用 `Future``Socket` 中异步读取数据的例子:
```rust
pub struct SocketRead<'a> {
socket: &'a Socket,
@ -409,7 +430,7 @@ impl SimpleFuture for SocketRead<'_> {
Poll::Ready(self.socket.read_buf())
} else {
// socket中还没数据
//
//
// 注册一个`wake`函数,当数据可用时,该函数会被调用,
// 然后当前Future的执行器会再次调用`poll`方法,此时就可以读取到数据
self.socket.set_readable_callback(wake);
@ -424,10 +445,11 @@ impl SimpleFuture for SocketRead<'_> {
关于该流程已经反复讲了很多次,相信大家应该非常清楚了。然而该例子中还有一个疑问没有解决:
- `set_readable_callback` 方法到底是怎么工作的?怎么才能知道 `socket` 中的数据已经可以被读取了?
关于第二点,其中一个简单粗暴的方法就是使用一个新线程不停的检查 `socket` 中是否有了数据,当有了后,就调用 `wake()` 函数。该方法确实可以满足需求,但是性能着实太低了,需要为每个阻塞的 `Future` 都创建一个单独的线程!
在现实世界中,该问题往往是通过操作系统提供的 `IO` 多路复用机制来完成,例如 `Linux` 中的 **`epoll`**`FreeBSD` 和 `macOS` 中的 **`kqueue`** `Windows` 中的 **`IOCP`**, `Fuchisa`中的 **`ports`** 等(可以通过 Rust 的跨平台包 `mio` 来使用它们)。借助IO多路复用机制可以实现一个线程同时阻塞地去等待多个异步IO事件一旦某个事件完成就立即退出阻塞并返回数据。相关实现类似于以下代码
在现实世界中,该问题往往是通过操作系统提供的 `IO` 多路复用机制来完成,例如 `Linux` 中的 **`epoll`**`FreeBSD` 和 `macOS` 中的 **`kqueue`** `Windows` 中的 **`IOCP`**, `Fuchisa`中的 **`ports`** 等(可以通过 Rust 的跨平台包 `mio` 来使用它们)。借助 IO 多路复用机制,可以实现一个线程同时阻塞地去等待多个异步 IO 事件,一旦某个事件完成就立即退出阻塞并返回数据。相关实现类似于以下代码:
```rust
struct IoBlocker {
/* ... */
@ -470,8 +492,8 @@ io_blocker.add_io_event_interest(
);
let event = io_blocker.block();
// 当socket的数据可以读取时打印 "Socket 1 is now READABLE"
// 当socket的数据可以读取时打印 "Socket 1 is now READABLE"
println!("Socket {:?} is now {:?}", event.id, event.signals);
```
这样我们只需要一个执行器线程它会接收IO事件并将其分发到对应的 `Waker` 中,接着后者会唤醒相关的任务,最终通过执行器 `poll` 后,任务可以顺利的继续执行, 这种IO读取流程可以不停的循环直到 `socket` 关闭。
这样,我们只需要一个执行器线程,它会接收 IO 事件并将其分发到对应的 `Waker` 中,接着后者会唤醒相关的任务,最终通过执行器 `poll` 后,任务可以顺利的继续执行, 这种 IO 读取流程可以不停的循环,直到 `socket` 关闭。

@ -1,47 +1,50 @@
# Async 编程简介
众所周知Rust 可以让我们写出性能高且安全的软件,那么异步编程这块儿呢?是否依然在高性能的同时保证了安全?
我们先通过一张web框架性能对比图来感受下 Rust 异步编程的性能:
我们先通过一张 web 框架性能对比图来感受下 Rust 异步编程的性能:
<img alt="actix-vs-gin screenshot" width="100%" src="https://pic1.zhimg.com/v2-3cebd732fb56f43713f106fdcfa44a3c_b.png" class="center" />
上图并不能说 Rust 写的 `actix` 框架比 Go 的 `gin` 更好、更优秀,但是确实可以一定程度上说明 Rust 的异步性能非常的高!
简单来说,异步编程是一个[并发编程模型](https://course.rs/advance/concurrency-with-threads/concurrency-parallelism.html)目前主流语言基本都支持了当然支持的方式有所不同。异步编程允许我们同时并发运行大量的任务却仅仅需要几个甚至一个OS线程或CPU核心现代化的异步编程在使用体验上跟同步编程也几无区别例如 Go 语言的 `go` 关键字,也包括我们后面将介绍的 `async/await` 语法,该语法是 `JavaScript``Rust` 的核心特性之一。
简单来说,异步编程是一个[并发编程模型](https://course.rs/advance/concurrency-with-threads/concurrency-parallelism.html),目前主流语言基本都支持了,当然,支持的方式有所不同。异步编程允许我们同时并发运行大量的任务,却仅仅需要几个甚至一个 OS 线程或 CPU 核心,现代化的异步编程在使用体验上跟同步编程也几无区别,例如 Go 语言的 `go` 关键字,也包括我们后面将介绍的 `async/await` 语法,该语法是 `JavaScript``Rust` 的核心特性之一。
## async 简介
## async简介
`async` 是 Rust 选择的异步编程模型,下面我们来介绍下它的优缺点,以及何时适合使用。
#### async vs 其它并发模型
#### async vs 其它并发模型
由于并发编程在现代社会非常重要因此每个主流语言都对自己的并发模型进行过权衡取舍和精心设计Rust 语言也不例外。下面的列表可以帮助大家理解不同并发模型的取舍:
- **OS线程**, 它最简单,也无需改变任何编程模型(业务/代码逻辑),因此非常适合作为语言的原生并发模型,我们在[多线程章节](../advnce/../advance/concurrency-with-threads/concurrency-parallelism.md)也提到过Rust 就选择了原生支持线程级的并发编程。但是,这种模型也有缺点,例如线程间的同步将变得更加困难,线程间的上下文切换损耗较大。使用线程池在一定程度上可以提升性能,但是对于 IO 密集的场景来说,线程池还是不够看。
- **OS 线程**, 它最简单,也无需改变任何编程模型(业务/代码逻辑),因此非常适合作为语言的原生并发模型,我们在[多线程章节](../advnce/../advance/concurrency-with-threads/concurrency-parallelism.md)也提到过Rust 就选择了原生支持线程级的并发编程。但是,这种模型也有缺点,例如线程间的同步将变得更加困难,线程间的上下文切换损耗较大。使用线程池在一定程度上可以提升性能,但是对于 IO 密集的场景来说,线程池还是不够看。
- **事件驱动(Event driven)**, 这个名词你可能比较陌生,如果说事件驱动常常跟回调( Callback )一起使用,相信大家就恍然大悟了。这种模型性能相当的好,但最大的问题就是存在回调地狱的风险:非线性的控制流和结果处理导致了数据流向和错误传播变得难以掌控,还会导致代码可维护性和可读性的大幅降低,大名鼎鼎的 JS 曾经就存在回调地狱。
- **协程(Coroutines)** 可能是目前最火的并发模型,`Go` 语言的协程设计就非常优秀,这也是 `Go` 语言能够迅速火遍全球的杀手锏之一。协程跟线程类似,无需改变编程模型,同时,它也跟 `async` 类似,可以支持大量的任务并发运行。但协程抽象层次过高,导致用户无法接触到底层的细节,这对于系统编程语言和自定义异步运行时是难以接受的
- **actor模型**是erlang的杀手锏之一它将所有并发计算分割成一个一个单元这些单元被称为 `actor` , 单元之间通过消息传递的方式进行通信和数据传递,跟分布式系统的设计理念非常相像。由于 `actor` 模型跟现实很贴近,因此它相对来说更容易实现,但是一旦遇到流控制、失败重试等场景时,就会变得不太好用
- **actor 模型**是 erlang 的杀手锏之一,它将所有并发计算分割成一个一个单元,这些单元被称为 `actor` , 单元之间通过消息传递的方式进行通信和数据传递,跟分布式系统的设计理念非常相像。由于 `actor` 模型跟现实很贴近,因此它相对来说更容易实现,但是一旦遇到流控制、失败重试等场景时,就会变得不太好用
- **async/await** 该模型性能高,还能支持底层编程,同时又像线程和协程那样无需过多的改变编程模型,但有得必有失,`async` 模型的问题就是内部实现机制过于复杂,对于用户来说,理解和使用起来也没有线程和协程简单,好在前者的复杂性开发者们已经帮我们封装好,而理解和使用起来不够简单,正是本章试图解决的问题。
总之Rust 经过权衡取舍后,最终选择了同时提供多线程编程和 async 编程:
- 前者通过标准库实现,当你无需那么高的并发时,例如需要并行计算时,可以选择它,优点是线程内的代码执行效率更高、实现更直观更简单,这块内容已经在多线程章节进行过深入讲解,不再赘述
- 后者通过语言特性 + 标准库 + 三方库的方式实现,在你需要高并发、异步 `I/O` 时,选择它就对了
#### async: Rust vs 其它语言
目前已经有诸多语言都通过 `async` 的方式提供了异步编程,例如 `JavaScript` ,但 `Rust` 在实现上有所区别:
- **Future 在 Rust 中是惰性的**,只有在被轮询(`poll`)时才会运行, 因此丢弃一个 `future` 会阻止它未来再被运行, 你可以将`Future`理解为一个在未来某个时间点被调度执行的任务。
- **Async 在 Rust 中使用开销是零** 意味着只有你能看到的代码(自己的代码)才有性能损耗,你看不到的(`async` 内部实现)都没有性能损耗,例如,你可以无需分配任何堆内存、也无需任何动态分发来使用 `async` 这对于热点路径的性能有非常大的好处正是得益于此Rust 的异步编程性能才会这么高。
- **Rust 没有内置异步调用所必须的运行时**但是无需担心Rust社区生态中已经提供了非常优异的运行时实现例如大明星 [`tokio`](https://tokio.rs)
- **Rust 没有内置异步调用所必须的运行时**但是无需担心Rust 社区生态中已经提供了非常优异的运行时实现,例如大明星 [`tokio`](https://tokio.rs)
- **运行时同时支持单线程和多线程**,这两者拥有各自的优缺点, 稍后会讲
#### Rust: async vs 多线程
虽然 `async` 和多线程都可以实现并发编程,后者甚至还能通过线程池来增强并发能力,但是这两个方式并不互通,从一个方式切换成另一个需要大量的代码重构工作,因此提前为自己的项目选择适合的并发模型就变得至关重要。
`OS` 线程非常适合少量任务并发,因为线程的创建和上下文切换是非常昂贵的,甚至于空闲的线程都会消耗系统资源。虽说线程池可以有效的降低性能损耗,但是也无法彻底解决问题。当然,线程模型也有其优点,例如它不会破坏你的代码逻辑和编程模型,你之前的顺序代码,经过少量修改适配后依然可以在新线程中直接运行,同时在某些操作系统中,你还可以改变线程的优先级,这对于实现驱动程序或延迟敏感的应用(例如硬实时系统)很有帮助。
对于长时间运行的CPU密集型任务例如并行计算使用线程将更有优势。 这种密集任务往往会让所在的线程持续运行任何不必要的线程切换都会带来性能损耗因此高并发反而在此时成为了一种多余。同时你所创建的线程数应该等于CPU核心数充分利用CPU的并行能力甚至还可以将线程绑定到CPU核心上进一步减少线程上下文切换。
对于长时间运行的 CPU 密集型任务,例如并行计算,使用线程将更有优势。 这种密集任务往往会让所在的线程持续运行,任何不必要的线程切换都会带来性能损耗,因此高并发反而在此时成为了一种多余。同时你所创建的线程数应该等于 CPU 核心数,充分利用 CPU 的并行能力,甚至还可以将线程绑定到 CPU 核心上,进一步减少线程上下文切换。
而高并发更适合 `IO` 密集型任务,例如 web 服务器、数据库连接等等网络服务,因为这些任务绝大部分时间都处于等待状态,如果使用多线程,那线程大量时间会处于无所事事的状态,再加上线程上下文切换的高昂代价,让多线程做 `IO` 密集任务变成了一件非常奢侈的事。而使用`async`,既可以有效的降低 `CPU` 和内存的负担,又可以让大量的任务并发的运行,一个任务一旦处于`IO`或者其他等待(阻塞)状态,就会被立刻切走并执行另一个任务,而这里的任务切换的性能开销要远远低于使用多线程时的线程上下文切换。
@ -56,17 +59,19 @@
- 有大量 `CPU` 密集任务需要并行运行时,例如并行计算,选多线程模型,且让线程数等于或者稍大于 `CPU` 核心数
- 无所谓时,统一选多线程
#### async和多线程的性能对比
#### async 和多线程的性能对比
| 操作 | async | 线程 |
| ---- | ----- | ---- |
| 创建 | 0.3微秒 | 17微秒 |
| 线程切换 | 0.2微秒 | 1.7微秒 |
| 操作 | async | 线程 |
| -------- | -------- | -------- |
| 创建 | 0.3 微秒 | 17 微秒 |
| 线程切换 | 0.2 微秒 | 1.7 微秒 |
可以看出,`async` 在线程切换的开销显著低于多线程,对于 IO 密集的场景,这种性能开销累计下来会非常可怕!
#### 一个例子
在大概理解`async`后,我们再来看一个简单的例子。如果想并发的下载文件,你可以使用多线程如下实现:
```rust
fn get_two_sites() {
// 创建两个新线程执行任务
@ -80,6 +85,7 @@ fn get_two_sites() {
```
如果是在一个小项目中简单的去下载文件,这么写没有任何问题,但是一旦下载文件的并发请求多起来,那一个下载任务占用一个线程的模式就太重了,会很容易成为程序的瓶颈。好在,我们可以使用`async`的方式来解决:
```rust
async fn get_two_sites_async() {
// 创建两个不同的`future`,你可以把`future`理解为未来某个时刻会被执行的计划任务
@ -96,7 +102,8 @@ async fn get_two_sites_async() {
事实上,`async` 和多线程并不是二选一,在同一应用中,可以根据情况两者一起使用,当然,我们还可以使用其它的并发模型,例如上面提到事件驱动模型,前提是有三方库提供了相应的实现。
## Async Rust当前的进展
## Async Rust 当前的进展
简而言之Rust 语言的 `async` 目前还没有达到多线程的成熟度,其中一部分内容还在不断进化中,当然,这并不影响我们在生产级项目中使用,因为社区中还有 `tokio` 这种大杀器。
使用 `async` 时,你会遇到好的,也会遇到不好的,例如:
@ -105,64 +112,71 @@ async fn get_two_sites_async() {
- 会经常跟进阶语言特性打交道,例如生命周期等,这些家伙可不好对付
- 一些兼容性问题,例如同步和异步代码、不同的异步运行时( `tokio``async-std` )
- 更昂贵的维护成本,原因是 `async` 和社区开发的运行时依然在不停的进化
总之,`async` 在 Rust 中并不是一个善茬,你会遇到更多的困难或者说坑,也会带来更高的代码阅读成本及维护成本,但是为了性能,一切都值了,不是吗?
不过好在,这些进化早晚会彻底稳定成熟,而且在实际项目中,我们往往会使用成熟的三方库,例如`tokio`,因此可以避免一些类似的问题,但是对于本章的学习来说,`async` 的一些难点还是我们必须要去面对和征服的。
#### 语言和库的支持
`async` 的底层实现非常复杂,且会导致编译后文件体积显著增加,因此 Rust 没有选择像 Go 语言那样内置了完整的特性和运行时,而是选择了通过 Rust 语言提供了必要的特性支持,再通过社区来提供 `async` 运行时的支持。 因此要完整的使用 `async` 异步编程,你需要依赖以下特性和外部库:
- 所必须的特征(例如 `Future` )、类型和函数,由标准库提供实现
- 关键字 `async/await` 由Rust语言提供并进行了编译器层面的支持
- 关键字 `async/await` Rust 语言提供,并进行了编译器层面的支持
- 众多实用的类型、宏和函数由官方开发的 [`futures`](https://github.com/rust-lang/futures-rs) 包提供(不是标准库),它们可以用于任何 `async` 应用中。
- `async` 代码的执行、`IO` 操作、任务创建和调度等等复杂功能由社区的 `async` 运行时提供,例如 [`tokio`](https://github.com/tokio-rs/tokio) 和 [`async-std`](https://github.com/async-rs/async-std)
还有,你在同步( `synchronous` )代码中使用的一些语言特性在 `async` 中可能将无法再使用,而且 Rust 也不允许你在特征中声明 `async` 函数(可以通过三方库实现) 总之,你会遇到一些在同步代码中不会遇到的奇奇怪怪、形形色色的问题,不过不用担心,本章会专门用一个章节罗列这些问题,并给出相应的解决方案。
#### 编译和错误
在大多数情况下,`async` 中的编译错误和运行时错误跟之前没啥区别,但是依然有以下几点值得注意:
- 编译错误,由于 `async` 编程时需要经常使用复杂的语言特性,例如生命周期和`Pin`,因此相关的错误可能会出现的更加频繁
- 运行时错误,编译器会为每一个`async`函数生成状态机,这会导致在栈跟踪时会包含这些状态机的细节,同时还包含了运行时对函数的调用,因此,栈跟踪记录(例如 `panic` 时)将变得更加难以解读
- 一些隐蔽的错误也可能发生,例如在一个 `async` 上下文中去调用一个阻塞的函数,或者没有正确的实现 `Future` 特征都有可能导致这种错误。这种错误可能会悄无声息的通过编译检查甚至有时候会通过单元测试。好在一旦你深入学习并掌握了本章的内容和 `async` 原理,可以有效的降低遇到这些错误的概率
#### 兼容性考虑
异步代码和同步代码并不总能和睦共处。例如,我们无法在一个同步函数中去调用一个 `async` 异步函数,同步和异步代码也往往使用不同的设计模式,这些都会导致两者融合上的困难。
甚至于有时候,异步代码之间也存在类似的问题,如果一个库依赖于特定的 `async` 运行时来运行,那么这个库非常有必要告诉它的用户,它用了这个运行时。否则一旦用户选了不同的或不兼容的运行时,就会导致不可预知的麻烦。
#### 性能特性
`async` 代码的性能主要取决于你使用的 `async` 运行时,好在这些运行时都经过了精心的设计,在你能遇到的绝大多数场景中,它们都能拥有非常棒的性能表现。
但是世事皆有例外。目前主流的 `async` 运行时几乎都使用了多线程实现,相比单线程虽然增加了并发表现,但是对于执行性能会有所损失,因为多线程实现会有同步和切换上的性能开销,若你需要极致的顺序执行性能,那么 `async` 目前并不是一个好的选择。
同样的对于延迟敏感的任务来说任务的执行次序需要能被严格掌控而不是交由运行时去自动调度后者会导致不可预知的延迟例如一个web服务器总是有 `1%` 的请求,它们的延迟会远高于其它请求,因为调度过于繁忙导致了部分任务被延迟调度,最终导致了较高的延时。正因为此,这些延迟敏感的任务非常依赖于运行时或操作系统提供调度次序上的支持。
同样的,对于延迟敏感的任务来说,任务的执行次序需要能被严格掌控,而不是交由运行时去自动调度,后者会导致不可预知的延迟,例如一个 web 服务器总是有 `1%` 的请求,它们的延迟会远高于其它请求,因为调度过于繁忙导致了部分任务被延迟调度,最终导致了较高的延时。正因为此,这些延迟敏感的任务非常依赖于运行时或操作系统提供调度次序上的支持。
以上的两个需求,目前的 `async` 运行时并不能很好的支持,在未来可能会有更好的支持,但在此之前,我们可以尝试用多线程解决。
## async/.await 简单入门
`async/.await` 是 Rust 内置的语言特性,可以让我们用同步的方式去编写异步的代码。
通过 `async` 标记的语法块会被转换成实现了`Future`特征的状态机。 与同步调用阻塞当前线程不同,当`Future`执行并遇到阻塞时,它会让出当前线程的控制权,这样其它的`Future`就可以在该线程中运行,这种方式完全不会导致当前线程的阻塞。
下面我们来通过例子学习 `async/.await` 关键字该如何使用,在开始之前,需要先引入 `futures` 包。编辑 `Cargo.toml` 文件并添加以下内容:
```toml
[dependencies]
futures = "0.3"
```
#### 使用 async
首先,使用 `async fn` 语法来创建一个异步函数:
```rust
async fn do_something() {
async fn do_something() {
println!("go go go !");
}
```
需要注意,**异步函数的返回值是一个 `Future`**,若直接调用该函数,不会输出任何结果,因为 `Future` 还未被执行:
```rust
fn main() {
do_something();
@ -170,6 +184,7 @@ fn main() {
```
运行后,`go go go`并没有打印,同时编译器给予一个提示:`warning: unused implementer of Future that must be used`,告诉我们 `Future` 未被使用,那么到底该如何使用?答案是使用一个执行器( `executor` ):
```rust
// `block_on`会阻塞当前线程直到指定的`Future`执行完成,这种阻塞当前线程以等待任务完成的方式较为简单、粗暴,
// 好在其它运行时的执行器(executor)会提供更加复杂的行为,例如将多个`future`调度到同一个线程上执行。
@ -186,7 +201,9 @@ fn main() {
```
#### 使用.await
在上述代码的`main`函数中,我们使用`block_on`这个执行器等待`Future`的完成,让代码看上去非常像是同步代码,但是如果你要在一个`async fn`函数中去调用另一个`async fn`并等待其完成后再执行后续的代码,该如何做?例如:
```rust
use futures::executor::block_on;
@ -200,18 +217,19 @@ async fn hello_cat() {
}
fn main() {
let future = hello_world();
block_on(future);
block_on(future);
}
```
这里,我们在`hello_world`异步函数中先调用了另一个异步函数`hello_cat`,然后再输出`hello, world!`,看看运行结果:
```console
warning: unused implementer of `futures::Future` that must be used
--> src/main.rs:6:5
|
6 | hello_cat();
| ^^^^^^^^^^^^
= note: futures do nothing unless you `.await` or poll them
= note: futures do nothing unless you `.await` or poll them
...
hello, world!
```
@ -219,6 +237,7 @@ hello, world!
不出所料,`main`函数中的`future`我们通过`block_on`函数进行了运行,但是这里的`hello_cat`返回的`Future`却没有任何人去执行它,不过好在编译器友善的给出了提示:`futures do nothing unless you .await or poll them `,两种解决方法:使用`.await`语法或者对`Future`进行轮询(`poll`)。
后者较为复杂,暂且不表,先来使用`.await`试试:
```rust
use futures::executor::block_on;
@ -232,11 +251,12 @@ async fn hello_cat() {
}
fn main() {
let future = hello_world();
block_on(future);
block_on(future);
}
```
为`hello_cat()`添加上`.await`后,结果立刻大为不同:
```console
hello, kitty!
hello, world!
@ -247,7 +267,9 @@ hello, world!
总之,在`async fn`函数中使用`.await`可以等待另一个异步调用的完成。**但是与`block_on`不同,`.await`并不会阻塞当前的线程**,而是异步的等待`Future A`的完成,在等待的过程中,该线程还可以继续执行其它的`Future B`,最终实现了并发处理的效果。
#### 一个例子
考虑一个载歌载舞的例子,如果不用`.await`,我们可能会有如下实现:
```rust
async fn learn_song() -> Song { /* ... */ }
async fn sing_song(song: Song) { /* ... */ }
@ -261,12 +283,13 @@ fn main() {
```
当然,以上代码运行结果无疑是正确的,但。。。它的性能何在?需要通过连续三次阻塞去等待三个任务的完成,一次只能做一件事,实际上我们完全可以载歌载舞啊:
```rust
async fn sing_song(song: Song) { /* ... */ }
async fn learn_and_sing() {
// 这里使用`.await`来等待学歌的完成,但是并不会阻塞当前线程,该线程在学歌的任务`.await`后,完全可以去执行跳舞的任务
let song = learn_song().await;
// 唱歌必须要在学歌之后
sing_song(song).await;
}
@ -287,8 +310,6 @@ fn main() {
上面代码中,学歌和唱歌具有明显的先后顺序,但是这两者都可以跟跳舞一同存在,也就是你可以在跳舞的时候学歌,也可以在跳舞的时候唱歌。如果上面代码不使用`.await`,而是使用`block_on(learn_song())` 那在学歌时,当前线程就会阻塞,不再可以做其它任何事,包括跳舞。
因此`.await`对于实现异步编程至关重要,它允许我们在同一个线程内并发的运行多个任务,而不是一个一个先后完成。若大家看到这里还是不太明白,强烈建议回头再仔细看一遍,同时亲自上手修改代码试试效果。
至此读者应该对Rust的`async/.await`异步编程有了一个清晰的初步印象,下面让我们一起来看看这背后的原理:`Future`和任务在底层如何被执行。
至此,读者应该对 Rust 的`async/.await`异步编程有了一个清晰的初步印象,下面让我们一起来看看这背后的原理:`Future`和任务在底层如何被执行。

@ -1,14 +1,14 @@
# 异步编程
在艰难的学完Rust入门和进阶所有的55个章节后我们终于来到了这里。假如之前攀登的是珠穆拉玛峰那么现在攀登的就是乔戈里峰本章将学习的内容是关于async异步编程。
如果你想开发Web服务器、数据库驱动、消息服务等需要高并发的服务那么本章的内容将值得认真对待和学习将从以下方面深入讲解Rust的异步编程
在艰难的学完 Rust 入门和进阶所有的 55 个章节后,我们终于来到了这里。假如之前攀登的是珠穆拉玛峰,那么现在攀登的就是乔戈里峰,本章将学习的内容是关于 async 异步编程。
- Rust异步编程的通用概念介绍
- Future以及异步任务调度
- async/await和Pin/Unpin
如果你想开发 Web 服务器、数据库驱动、消息服务等需要高并发的服务,那么本章的内容将值得认真对待和学习,将从以下方面深入讲解 Rust 的异步编程:
- Rust 异步编程的通用概念介绍
- Future 以及异步任务调度
- async/await 和 Pin/Unpin
- 异步编程常用的三方库
- tokio库
- tokio
- 一些示例
> 本章在内容上大量借鉴和翻译了原版英文书籍[Asynchronous Programming In Rust](https://rust-lang.github.io/async-book/01_getting_started/01_chapter.html), 特此感谢

@ -1,10 +1,13 @@
# 使用`join!`和`select!`同时运行多个Future
# 使用`join!`和`select!`同时运行多个 Future
招数单一,杀伤力惊人,说的就是 `.await` ,但是光用它,还真做不到一招鲜吃遍天。比如我们该如何同时运行多个任务,而不是使用`.await`慢悠悠地排队完成。
## join!
`futures` 包中提供了很多实用的工具,其中一个就是 `join!`宏, 它允许我们同时等待多个不同 `Future` 的完成,且可以并发地运行这些 `Future`
先来看一个不是很给力的、使用`.await`的版本:
```rust
async fn enjoy_book_and_music() -> (Book, Music) {
let book = enjoy_book().await;
@ -13,9 +16,10 @@ async fn enjoy_book_and_music() -> (Book, Music) {
}
```
这段代码可以顺利运行,但是有一个很大的问题,就是必须先看完书后,才能听音乐。咱们以前,谁又不是那个摇头晃脑爱读书(耳朵里偷偷塞着耳机听的正high)的好学生呢?
这段代码可以顺利运行,但是有一个很大的问题,就是必须先看完书后,才能听音乐。咱们以前,谁又不是那个摇头晃脑爱读书(耳朵里偷偷塞着耳机,听的正 high)的好学生呢?
要支持同时看书和听歌,有些人可能会凭空生成下面代码:
```rust
// WRONG -- 别这么做
async fn enjoy_book_and_music() -> (Book, Music) {
@ -28,6 +32,7 @@ async fn enjoy_book_and_music() -> (Book, Music) {
看上去像模像样,嗯,在某些语言中也许可以,但是 Rust 不行。因为在某些语言中,`Future`一旦创建就开始运行,等到返回的时候,基本就可以同时结束并返回了。 但是 Rust 中的 `Future` 是惰性的,直到调用 `.await` 时,才会开始运行。而那两个 `await` 由于在代码中有先后顺序,因此它们是顺序运行的。
为了正确的并发运行两个 `Future` 我们来试试 `futures::join!` 宏:
```rust
use futures::join;
@ -41,6 +46,7 @@ async fn enjoy_book_and_music() -> (Book, Music) {
`Duang`,目标顺利达成。同时`join!`会返回一个元组,里面的值是对应的`Future`执行结束后输出的值。
## try_join!
由于`join!`必须等待它管理的所有 `Future` 完成后才能完成,如果你希望在某一个 `Future` 报错后就立即停止所有 `Future` 的执行,可以使用 `try_join!`,特别是当 `Future` 返回 `Result` 时:
```rust
@ -57,6 +63,7 @@ async fn get_book_and_music() -> Result<(Book, Music), String> {
```
有一点需要注意,传给 `try_join!` 的所有 `Future` 都必须拥有相同的错误类型。如果错误类型不同,可以考虑使用来自 `futures::future::TryFutureExt` 模块的 `map_err`和`err_info`方法将错误进行转换:
```rust
use futures::{
future::TryFutureExt,
@ -73,10 +80,12 @@ async fn get_book_and_music() -> Result<(Book, Music), String> {
}
```
`join!`很好很强大但是人无完人J无完J, 它有一个很大的问题。
`join!`很好很强大但是人无完人J 无完 J, 它有一个很大的问题。
## select!
`join!`只有等所有 `Future` 结束后,才能集中处理结果,如果你想同时等待多个 `Future` ,且任何一个 `Future` 结束后,都可以立即被处理,可以考虑使用 `futures::select!`:
```rust
use futures::{
future::FutureExt, // for `.fuse()`
@ -102,10 +111,12 @@ async fn race_tasks() {
上面的代码会同时并发地运行 `t1``t2` 无论两者哪个先完成,都会调用对应的 `println!` 打印相应的输出,然后函数结束且不会等待另一个任务的完成。
但是,在实际项目中,我们往往需要等待多个任务都完成后,再结束,像上面这种其中一个任务结束就立刻结束的场景着实不多。
但是,在实际项目中,我们往往需要等待多个任务都完成后,再结束,像上面这种其中一个任务结束就立刻结束的场景着实不多。
#### default 和 complete
#### default 和 complete
`select!`还支持 `default``complete` 分支:
- `complete` 分支当所有的 `Future``Stream` 完成后才会被执行,它往往配合`loop`使用,`loop`用于循环完成所有的 `Future`
- `default`分支,若没有任何 `Future``Stream` 处于 `Ready` 状态, 则该分支会被立即执行
@ -116,7 +127,7 @@ pub fn main() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;
loop {
select! {
a = a_fut => total += a,
@ -134,6 +145,7 @@ pub fn main() {
如果你希望 `default` 也有机会漏下脸,可以将 `complete``break` 修改为其它的,例如`println!("completed!")`,然后再观察下运行结果。
再回到 `select` 的第一个例子中,里面有一段代码长这样:
```rust
let t1 = task_one().fuse();
let t2 = task_two().fuse();
@ -144,6 +156,7 @@ pin_mut!(t1, t2);
当时没有展开讲,相信大家也有疑惑,下面我们来一起看看。
#### 跟 `Unpin``FusedFuture` 进行交互
首先,`.fuse()`方法可以让 `Future` 实现 `FusedFuture` 特征, 而 `pin_mut!` 宏会为 `Future` 实现 `Unpin`特征,这两个特征恰恰是使用 `select` 所必须的:
- `Unpin`,由于 `select` 不会通过拿走所有权的方式使用`Future`,而是通过可变引用的方式去使用,这样当 `select` 结束后,该 `Future` 若没有被完成,它的所有权还可以继续被其它代码使用。
@ -152,6 +165,7 @@ pin_mut!(t1, t2);
只有实现了`FusedFuture``select` 才能配合 `loop` 一起使用。假如没有实现,就算一个 `Future` 已经完成了,它依然会被 `select` 不停的轮询执行。
`Stream` 稍有不同,它们使用的特征是 `FusedStream`。 通过`.fuse()`(也可以手动实现)实现了该特征的 `Stream`,对其调用`.next()` 或 `.try_next()`方法可以获取实现了`FusedFuture`特征的`Future`:
```rust
use futures::{
stream::{Stream, StreamExt, FusedStream},
@ -179,7 +193,8 @@ async fn add_two_streams(
}
```
## 在select循环中并发
## 在 select 循环中并发
一个很实用但又鲜为人知的函数是 `Fuse::terminated()` ,可以使用它构建一个空的 `Future` ,空自然没啥用,但是如果它能在后面再被填充呢?
考虑以下场景:当你要在`select`循环中运行一个任务,但是该任务却是在`select`循环内部创建时,上面的函数就非常好用了。
@ -224,6 +239,7 @@ async fn run_loop(
}
}
```
当某个 `Future` 有多个拷贝都需要同时运行时,可以使用 `FuturesUnordered` 类型。下面的例子跟上个例子大体相似,但是它会将 `run_on_new_num_fut` 的每一个拷贝都运行到完成,而不是像之前那样一旦创建新的就终止旧的。
```rust
@ -242,7 +258,7 @@ async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }
// 使用从 `get_new_num` 获取的最新数字 来运行 `run_on_new_num`
//
// 每当计时器结束后,`get_new_num` 就会运行一次,它会立即取消当前正在运行的`run_on_new_num` ,
// 并且使用新返回的值来替换
// 并且使用新返回的值来替换
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
@ -273,4 +289,5 @@ async fn run_loop(
}
}
}
```
```

@ -1,8 +1,11 @@
# 一些疑难问题的解决办法
`async` 在 Rust 依然比较新,疑难杂症少不了,而它们往往还处于活跃开发状态,短时间内无法被解决,因此才有了本文。下面一起来看看这些问题以及相应的临时解决方案。
## 在 async 语句块中使用 ?
`async` 语句块和 `async fn` 最大的区别就是前者无法显式的声明返回值,在大多数时候这都不是问题,但是当配合 `?` 一起使用时,问题就有所不同:
```rust
async fn foo() -> Result<u8, String> {
Ok(1)
@ -20,6 +23,7 @@ pub fn main() {
```
以上代码编译后会报错:
```shell
error[E0282]: type annotations needed
--> src/main.rs:14:9
@ -34,6 +38,7 @@ error[E0282]: type annotations needed
原因在于编译器无法推断出 `Result<T, E>`中的 `E` 的类型, 而且编译器的提示`consider giving fut a type`你也别傻乎乎的相信,然后尝试半天,最后无奈放弃:目前还没有办法为 `async` 语句块指定返回类型。
既然编译器无法推断出类型,那咱就给它更多提示,可以使用 `::< ... >` 的方式来增加类型注释:
```rust
let fut = async {
foo().await?;
@ -44,11 +49,12 @@ let fut = async {
给予类型注释后此时编译器就知道`Result<T, E>`中的 `E` 的类型是`String`,进而成功通过编译。
## async 函数和 Send 特征
在多线程章节我们深入讲过 `Send` 特征对于多线程间数据传递的重要性,对于 `async fn` 也是如此,它返回的 `Future` 能否在线程间传递的关键在于 `.await` 运行过程中,作用域中的变量类型是否是 `Send`
学到这里,相信大家已经很清楚`Rc`无法在多线程环境使用,原因就在于它并未实现 `Send` 特征,那咱就用它来做例子:
```rust
use std::rc::Rc;
@ -57,6 +63,7 @@ struct NotSend(Rc<()>);
```
事实上,未实现 `Send` 特征的变量可以出现在 `async fn` 语句块中:
```rust
async fn bar() {}
async fn foo() {
@ -72,6 +79,7 @@ fn main() {
```
即使上面的 `foo` 返回的 `Future``Send` 但是在它内部短暂的使用 `NotSend` 依然是安全的,原因在于它的作用域并没有影响到 `.await`,下面来试试声明一个变量,然后让 `.await`的调用处于变量的作用域中试试:
```rust
async fn foo() {
let x = NotSend::default();
@ -80,6 +88,7 @@ async fn foo() {
```
不出所料,错误如期而至:
```shell
error: future cannot be sent between threads safely
--> src/main.rs:17:18
@ -116,8 +125,10 @@ async fn foo() {
是不是很简单?最终我们还是通过 `Drop` 的方式解决了这个问题,当然,还是期待未来 `std::mem::drop` 也能派上用场。
## 递归使用async fn
## 递归使用 async fn
在内部实现中,`async fn`被编译成一个状态机,这会导致递归使用 `async fn` 变得较为复杂, 因为编译后的状态机还需要包含自身。
```rust
// foo函数:
async fn foo() {
@ -144,6 +155,7 @@ enum Recursive {
```
这是典型的[动态大小类型](../advance/custom-type.md#动态大小类型),它的大小会无限增长,因此编译器会直接报错:
```shell
error[E0733]: recursion in an `async fn` requires boxing
--> src/lib.rs:1:22
@ -157,6 +169,7 @@ error[E0733]: recursion in an `async fn` requires boxing
如果认真学过之前的章节,大家应该知道只要将其使用 `Box` 放到堆上而不是栈上,就可以解决,在这里还是要称赞下 Rust 的编译器,给出的提示总是这么精确`recursion in an async fn requires boxing`。
就算是使用 `Box`,这里也大有讲究。如果我们试图使用 `Box::pin` 这种方式去包裹是不行的,因为编译器自身的限制限制了我们(刚夸过它。。。)。为了解决这种问题,我们只能将 `recursive` 转变成一个正常的函数,该函数返回一个使用 `Box` 包裹的 `async` 语句块:
```rust
use futures::future::{BoxFuture, FutureExt};
@ -169,6 +182,7 @@ fn recursive() -> BoxFuture<'static, ()> {
```
## 在特征中使用 async
在目前版本中,我们还无法在特征中定义 `async fn` 函数,不过大家也不用担心,目前已经有计划在未来移除这个限制了。
```rust
@ -178,6 +192,7 @@ trait Test {
```
运行后报错:
```shell
error[E0706]: functions in traits cannot be declared `async`
--> src/main.rs:5:5
@ -192,6 +207,7 @@ error[E0706]: functions in traits cannot be declared `async`
```
好在编译器给出了提示,让我们使用 [`async-trait`](https://github.com/dtolnay/async-trait) 解决这个问题:
```rust
use async_trait::async_trait;
@ -229,4 +245,5 @@ impl Advertisement for AutoplayingVideo {
}
```
不过使用该包并不是免费的,每一次特征中的`async`函数被调用时,都会产生一次堆内存分配。对于大多数场景,这个性能开销都可以接受,但是当函数一秒调用几十万、几百万次时,就得小心这块儿代码的性能了!
不过使用该包并不是免费的,每一次特征中的`async`函数被调用时,都会产生一次堆内存分配。对于大多数场景,这个性能开销都可以接受,但是当函数一秒调用几十万、几百万次时,就得小心这块儿代码的性能了!

@ -1,13 +1,14 @@
# 定海神针Pin和Unpin
在Rust异步编程中有一个定海神针般的存在它就是 `Pin` ,作用说简单也简单,说复杂也非常复杂,当初刚出来时就连一些 Rust 大佬都一头雾水,何况瑟瑟发抖的我。好在今非昔比,目前网上的资料已经很全,而我就借花献佛,给大家好好讲讲这个`Pin`。
# 定海神针 Pin 和 Unpin
在Rust中所有的类型可以分为两类:
在 Rust 异步编程中,有一个定海神针般的存在,它就是 `Pin` ,作用说简单也简单,说复杂也非常复杂,当初刚出来时就连一些 Rust 大佬都一头雾水,何况瑟瑟发抖的我。好在今非昔比,目前网上的资料已经很全,而我就借花献佛,给大家好好讲讲这个`Pin`。
在 Rust 中,所有的类型可以分为两类:
- **类型的值可以在内存中安全地被移动**,例如数值、字符串、布尔值、结构体、枚举,总之你能想到的几乎所有类型都可以落入到此范畴内
- **自引用类型**,大魔王来了,大家快跑,在之前章节我们已经见识过它的厉害
下面就是一个自引用类型
```rust
struct SelfRef {
value: String,
@ -17,12 +18,14 @@ struct SelfRef {
在上面的结构体中,`pointer_to_value` 是一个原生指针,指向第一个字段 `value` 持有的字符串 `String` 。很简单对吧?现在考虑一个情况, 若`String` 被移动了怎么办?
此时一个致命的问题就出现了:新的字符串的内存地址变了,而 `pointer_to_value` 依然指向之前的地址一个重大bug就出现了
此时一个致命的问题就出现了:新的字符串的内存地址变了,而 `pointer_to_value` 依然指向之前的地址,一个重大 bug 就出现了!
灾难发生,英雄在哪?只见 `Pin` 闪亮登场,它可以防止一个类型在内存中被移动。再来回忆下之前在 `Future` 章节中,我们提到过在 `poll` 方法的签名中有一个 `self: Pin<&mut Self>` ,那么为何要在这里使用 `Pin` 呢?
灾难发生,英雄在哪?只见 `Pin` 闪亮登场,它可以防止一个类型在内存中被移动。再来回忆下之前在 `Future` 章节中,我们提到过在 `poll` 方法的签名中有一个 `self: Pin<&mut Self>` ,那么为何要在这里使用 `Pin` 呢?
## 为何需要 Pin
## 为何需要Pin
其实 `Pin` 还有一个小伙伴 `UnPin` ,与前者相反,后者表示类型可以在内存中安全地移动。在深入之前,我们先来回忆下 `async/.await` 是如何工作的:
```rust
let fut_one = /* ... */; // Future 1
let fut_two = /* ... */; // Future 2
@ -33,6 +36,7 @@ async move {
```
在底层,`async` 会创建一个实现了 `Future` 的匿名类型,并提供了一个 `poll` 方法:
```rust
// `async { ... }`语句块创建的 `Future` 类型
struct AsyncFuture {
@ -72,6 +76,7 @@ impl Future for AsyncFuture {
`poll` 第一次被调用时,它会去查询 `fut_one` 的状态,若 `fut_one` 无法完成,则 `poll` 方法会返回。未来对 `poll` 的调用将从上一次调用结束的地方开始。该过程会一直持续,直到 `Future` 完成为止。
然而,如果我们的 `async` 语句块中使用了引用类型,会发生什么?例如下面例子:
```rust
async {
let mut x = [0; 128];
@ -82,6 +87,7 @@ async {
```
这段代码会编译成下面的形式:
```rust
struct ReadIntoBuf<'a> {
buf: &'a mut [u8], // 指向下面的`x`字段
@ -98,9 +104,11 @@ struct AsyncFuture {
若能将 `Future` 在内存中固定到一个位置,就可以避免这种问题的发生,也就可以安全的创建上面这种引用类型。
## Unpin
事实上,绝大多数类型都不在意是否被移动(开篇提到的第一种类型),因此它们都**自动实现**了 `Unpin` 特征。
从名字推测,大家可能以为 `Pin``Unpin` 都是特征吧?实际上,`Pin` 不按套路出牌,它是一个结构体:
```rust
pub struct Pin<P> {
pointer: P,
@ -125,9 +133,10 @@ pub struct Pin<P> {
- 都可以通过`!`语法去除实现
- 绝大多数情况都是自动实现, 无需我们的操心
## 深入理解 Pin
对于上面的问题,我们可以简单的归结为如何在 Rust 中处理自引用类型(果然,只要是难点,都和自引用脱离不了关系),下面用一个稍微简单点的例子来理解下 `Pin` :
```rust
#[derive(Debug)]
struct Test {
@ -162,6 +171,7 @@ impl Test {
`Test` 提供了方法用于获取字段 `a``b` 的值的引用。这里`b` 是 `a` 的一个引用但是我们并没有使用引用类型而是用了原生指针原因是Rust 的借用规则不允许我们这样用,因为不符合生命周期的要求。 此时的 `Test` 就是一个自引用结构体。
如果不移动任何值,那么上面的例子将没有任何问题,例如:
```rust
fn main() {
let mut test1 = Test::new("test1");
@ -176,12 +186,14 @@ fn main() {
```
输出非常正常:
```console
a: test1, b: test1
a: test2, b: test2
```
明知山有虎,偏向虎山行,这才是我辈年轻人的风华。既然移动数据会导致指针不合法,那我们就移动下数据试试,将 `test``test2` 进行下交换:
```rust
fn main() {
let mut test1 = Test::new("test1");
@ -197,12 +209,14 @@ fn main() {
```
按理来说,这样修改后,输出应该如下:
```rust
a: test1, b: test1
a: test1, b: test1
```
但是实际运行后,却产生了下面的输出:
```rust
a: test1, b: test1
a: test1, b: test2
@ -211,6 +225,7 @@ a: test1, b: test2
原因是 `test2.b` 指针依然指向了旧的地址,而该地址对应的值现在在 `test1` 里,最终会打印出意料之外的值。
如果大家还是将信将疑,那再看看下面的代码:
```rust
fn main() {
let mut test1 = Test::new("test1");
@ -231,10 +246,13 @@ fn main() {
<img alt="" src="https://pica.zhimg.com/80/v2-eaeb33da283dc1063b862d2307821976_1440w.jpg" class="center" />
## Pin 在实践中的运用
在理解了 `Pin` 的作用后,我们再来看看它怎么帮我们解决问题。
#### 将值固定到栈上
回到之前的例子,我们可以用 `Pin` 来解决指针指向的数据被移动的问题:
```rust
use std::pin::Pin;
use std::marker::PhantomPinned;
@ -277,9 +295,10 @@ impl Test {
一旦类型实现了 `!Unpin` ,那将它的值固定到栈( `stack` )上就是不安全的行为,因此在代码中我们使用了 `unsafe` 语句块来进行处理,你也可以使用 [`pin_utils`](https://docs.rs/pin-utils/) 来避免 `unsafe` 的使用。
> BTW, Rust 中的 unsafe 其实没有那么可怕,虽然听上去很不安全,但是实际上 Rust 依然提供了很多机制来帮我们提升了安全性,因此不必像对待 Go 语言的 `unsafe` 那样去畏惧于使用Rust中的 `unsafe` ,大致使用原则总结如下:没必要用时,就不要用,当有必要用时,就大胆用,但是尽量控制好边界,让 `unsafe` 的范围尽可能小
> BTW, Rust 中的 unsafe 其实没有那么可怕,虽然听上去很不安全,但是实际上 Rust 依然提供了很多机制来帮我们提升了安全性,因此不必像对待 Go 语言的 `unsafe` 那样去畏惧于使用 Rust 中的 `unsafe` ,大致使用原则总结如下:没必要用时,就不要用,当有必要用时,就大胆用,但是尽量控制好边界,让 `unsafe` 的范围尽可能小
此时,再去尝试移动被固定的值,就会导致**编译错误**
```rust
pub fn main() {
// 此时的`test1`可以被安全的移动
@ -299,6 +318,7 @@ pub fn main() {
```
注意到之前的粗体字了吗是的Rust 并不是在运行时做这件事,而是在编译期就完成了,因此没有额外的性能开销!来看看报错:
```shell
error[E0277]: `PhantomPinned` cannot be unpinned
--> src/main.rs:47:43
@ -310,15 +330,16 @@ error[E0277]: `PhantomPinned` cannot be unpinned
> 需要注意的是固定在栈上非常依赖于你写出的 `unsafe` 代码的正确性。我们知道 `&'a mut T` 可以固定的生命周期是 `'a` ,但是我们却不知道当生命周期 `'a` 结束后,该指针指向的数据是否会被移走。如果你的 `unsafe` 代码里这么实现了,那么就会违背 `Pin` 应该具有的作用!
>
> 一个常见的错误就是忘记去遮蔽(shadow )初始的变量,因为你可以 `drop``Pin` ,然后在 `&'a mut T` 结束后去移动数据:
>
> ```rust
> fn main() {
> let mut test1 = Test::new("test1");
> let mut test1_pin = unsafe { Pin::new_unchecked(&mut test1) };
> Test::init(test1_pin.as_mut());
>
>
> drop(test1_pin);
> println!(r#"test1.b points to "test1": {:?}..."#, test1.b);
>
>
> let mut test2 = Test::new("test2");
> mem::swap(&mut test1, &mut test2);
> println!("... and now it points nowhere: {:?}", test1.b);
@ -363,6 +384,7 @@ error[E0277]: `PhantomPinned` cannot be unpinned
> ```
#### 固定到堆上
将一个 `!Unpin` 类型的值固定到堆上,会给予该值一个稳定的内存地址,它指向的堆中的值在 `Pin` 后是无法被移动的。而且与固定在栈上不同,我们知道堆上的值在整个生命周期内都会被稳稳地固定住。
```rust
@ -409,13 +431,14 @@ pub fn main() {
```
#### 将固定住的 `Future` 变为 `Unpin`
之前的章节我们有提到 `async` 函数返回的 `Future` 默认就是 `!Unpin` 的。
但是,在实际应用中,一些函数会要求它们处理的 `Future``Unpin` 的,此时,若你使用的 `Future``!Unpin` 的,必须要使用以下的方法先将 `Future` 进行固定:
- `Box::pin` 创建一个 `Pin<Box<T>>`
- `pin_utils::pin_mut!` 创建一个 `Pin<&mut T>`
固定后获得的 `Pin<Box<T>>``Pin<&mut T>` 既可以用于 `Future` **又会自动实现 `Unpin`**。
```rust
@ -426,7 +449,7 @@ fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { /* ... */ }
let fut = async { /* ... */ };
// 下面代码报错: 默认情况下,`fut` 实现的是`!Unpin`,并没有实现`Unpin`
// execute_unpin_future(fut);
// execute_unpin_future(fut);
// 使用`Box`进行固定
let fut = async { /* ... */ };
@ -439,18 +462,17 @@ pin_mut!(fut);
execute_unpin_future(fut); // OK
```
## 总结
相信大家看到这里,脑袋里已经快被 `Pin``Unpin``!Unpin` 整爆炸了,没事,我们再来火上浇油下:)
- 若 `T: Unpin` ( Rust 类型的默认实现),那么 `Pin<'a, T>``&'a mut T` 完全相同,也就是 `Pin` 将没有任何效果, 该移动还是照常移动
- 绝大多数标准库类型都实现了 `Unpin` ,事实上,对于 Rust 中你能遇到的绝大多数类型,该结论依然成立
,其中一个例外就是:`async/await` 生成的 `Future` 没有实现 `Unpin`
,其中一个例外就是:`async/await` 生成的 `Future` 没有实现 `Unpin`
- 你可以通过以下方法为自己的类型添加 `!Unpin` 约束:
- 使用文中提到的 `std::marker::PhantomPinned`
- 使用`nightly` 版本下的 `feature flag`
- 使用`nightly` 版本下的 `feature flag`
- 可以将值固定到栈上,也可以固定到堆上
- 将 `!Unpin` 值固定到栈上需要使用 `unsafe`
- 将 `!Unpin` 值固定到堆上无需 `unsafe` ,可以通过 `Box::pin` 来简单的实现
- 当固定类型`T: !Unpin`时你需要保证数据从被固定到被drop这段时期内其内存不会变得非法或者被重用
- 当固定类型`T: !Unpin`时,你需要保证数据从被固定到被 drop 这段时期内,其内存不会变得非法或者被重用

@ -1,10 +1,13 @@
# 一个实践项目: Web服务器
知识学得再多,不实际应用也是纸上谈兵,不是忘掉就是废掉,对于技术学习尤为如此。在之前章节中,我们已经学习了 `Async Rust` 的方方面面现在来将这些知识融会贯通最终实现一个并发Web服务器。
# 一个实践项目: Web 服务器
知识学得再多,不实际应用也是纸上谈兵,不是忘掉就是废掉,对于技术学习尤为如此。在之前章节中,我们已经学习了 `Async Rust` 的方方面面,现在来将这些知识融会贯通,最终实现一个并发 Web 服务器。
## 多线程版本的 Web 服务器
## 多线程版本的Web服务器
在正式开始前,先来看一个单线程版本的 `Web` 服务器,该例子来源于 [`Rust Book`](https://doc.rust-lang.org/book/ch20-01-single-threaded.html) 一书。
`src/main.rs`:
```rust
use std::fs;
use std::io::prelude::*;
@ -48,6 +51,7 @@ fn handle_connection(mut stream: TcpStream) {
```
`hello.html`:
```rust
<!DOCTYPE html>
<html lang="en">
@ -63,6 +67,7 @@ fn handle_connection(mut stream: TcpStream) {
```
`404.html`:
```rust
<!DOCTYPE html>
<html lang="en">
@ -79,12 +84,14 @@ fn handle_connection(mut stream: TcpStream) {
运行以上代码,并从浏览器访问 `127.0.0.1:7878` 你将看到一条来自 `Ferris` 的问候。
在回忆了单线程版本该如何实现后,我们也将进入正题,一起来实现一个基于 `async` 的异步Web服务器。
在回忆了单线程版本该如何实现后,我们也将进入正题,一起来实现一个基于 `async` 的异步 Web 服务器。
## 运行异步代码
一个 Web 服务器必须要能并发的处理大量来自用户的请求,也就是我们不能在处理完上一个用户的请求后,再处理下一个用户的请求。上面的单线程版本可以修改为多线程甚至于线程池来实现并发处理,但是线程还是太重了,使用 `async` 实现 `Web` 服务器才是最适合的。
首先将 `handle_connection` 修改为 `async` 实现:
```rust
async fn handle_connection(mut stream: TcpStream) {
//<-- snip -->
@ -96,9 +103,11 @@ async fn handle_connection(mut stream: TcpStream) {
在之前的代码中,我们使用了自己实现的简单的执行器来进行`.await` 或 `poll` ,实际上这只是为了学习原理,**在实际项目中,需要选择一个三方的 `async` 运行时来实现相关的功能**。 具体的选择我们将在下一章节进行讲解,现在先选择 `async-std` ,该包的最大优点就是跟标准库的 API 类似,相对来说更简单易用。
#### 使用 async-std 作为异步运行时
下面的例子将演示如何使用一个异步运行时`async-std`来让之前的 `async fn` 函数运行起来,该运行时允许使用属性 `#[async_std::main]` 将我们的 `fn main` 函数变成 `async fn main` ,这样就可以在 `main` 函数中直接调用其它 `async` 函数,否则你得用之前章节的 `block_on` 方法来让 `main` 去阻塞等待异步函数的完成,但是这种简单粗暴的阻塞等待方式并不灵活。
下面的例子将演示如何使用一个异步运行时`async-std`来让之前的 `async fn` 函数运行起来,该运行时允许使用属性 `#[async_std::main]` 将我们的 `fn main` 函数变成 `async fn main` ,这样就可以在 `main` 函数中直接调用其它 `async` 函数,否则你得用之前章节的 `block_on` 方法来让 `main` 去阻塞等待异步函数的完成,但是这种简单粗暴的阻塞等待方式并不灵活。
修改 `Cargo.toml` 添加 `async-std` 包并开启相应的属性:
```toml
[dependencies]
futures = "0.3"
@ -108,7 +117,8 @@ version = "1.6"
features = ["attributes"]
```
下面将 `main` 函数修改为异步的,并在其中调用前面修改的异步版本 `handle_connection` :
下面将 `main` 函数修改为异步的,并在其中调用前面修改的异步版本 `handle_connection` :
```rust
#[async_std::main]
async fn main() {
@ -122,6 +132,7 @@ async fn main() {
```
**上面的代码虽然已经是异步的,实际上它还无法并发**,原因我们后面会解释,先来模拟一下慢请求:
```rust
use async_std::task;
@ -148,16 +159,18 @@ async fn handle_connection(mut stream: TcpStream) {
}
```
上面是全新实现的 `handle_connection` 它会在内部睡眠5秒模拟一次用户慢请求需要注意的是我们并没有使用 `std::thread::sleep` 进行睡眠,原因是该函数是阻塞的,它会让当前线程陷入睡眠中,导致其它任务无法继续运行!因此我们需要一个睡眠函数 `async_std::task::sleep`,它仅会让当前的任务陷入睡眠,然后该任务会让出线程的控制权,这样线程就可以继续运行其它任务。
上面是全新实现的 `handle_connection` ,它会在内部睡眠 5 秒,模拟一次用户慢请求,需要注意的是,我们并没有使用 `std::thread::sleep` 进行睡眠,原因是该函数是阻塞的,它会让当前线程陷入睡眠中,导致其它任务无法继续运行!因此我们需要一个睡眠函数 `async_std::task::sleep`,它仅会让当前的任务陷入睡眠,然后该任务会让出线程的控制权,这样线程就可以继续运行其它任务。
因此,光把函数变成 `async` 往往是不够的,还需要将它内部的代码也都变成异步兼容的,阻塞线程绝对是不可行的。
现在运行服务器,并访问 `127.0.0.1:7878/sleep` 你会发现只有在完成第一个用户请求(5秒后),才能开始处理第二个用户请求。现在再来看看该如何解决这个问题,让请求并发起来。
现在运行服务器,并访问 `127.0.0.1:7878/sleep` 你会发现只有在完成第一个用户请求(5 秒后),才能开始处理第二个用户请求。现在再来看看该如何解决这个问题,让请求并发起来。
## 并发地处理连接
上面代码最大的问题是 `listener.incoming()` 是阻塞的迭代器。当 `listener` 在等待连接时,执行器是无法执行其它`Future`的,而且只有在我们处理完已有的连接后,才能接收新的连接。
解决方法是将 `listener.incoming()` 从一个阻塞的迭代器变成一个非阻塞的 `Stream` 后者在前面章节有过专门介绍:
```rust
use async_std::net::TcpListener;
use async_std::net::TcpStream;
@ -182,6 +195,7 @@ async fn main() {
- 使用 `for_each_concurrent` 并发地处理从 `Stream` 获取的元素
现在上面的实现的关键在于 `handle_connection` 不能再阻塞:
```rust
use async_std::prelude::*;
@ -198,9 +212,11 @@ async fn handle_connection(mut stream: TcpStream) {
在将数据读写改造成异步后,现在该函数也彻底变成了异步的版本,因此一次慢请求不再会阻止其它请求的运行。
## 使用多线程并行处理请求
聪明的读者不知道有没有发现,之前的例子有一个致命的缺陷:只能使用一个线程并发的处理用户请求。是的,这样也可以实现并发,一秒处理几千次请求问题不大,但是这毕竟没有利用上 CPU 的多核并行能力,无法实现性能最大化。
`async` 并发和多线程其实并不冲突,而 `async-std` 包也允许我们使用多个线程去处理,由于 `handle_connection` 实现了 `Send` 特征且不会阻塞,因此使用 `async_std::task::spawn` 是非常安全的:
```rust
use async_std::task::spawn;
@ -220,9 +236,11 @@ async fn main() {
至此,我们实现了同时使用并行(多线程)和并发( `async` )来同时处理多个请求!
## 测试 handle_connection 函数
对于测试 Web 服务器,使用集成测试往往是最简单的,但是在本例子中,将使用单元测试来测试连接处理函数的正确性。
为了保证单元测试的隔离性和确定性,我们使用 `MockTcpStream` 来替代 `TcpStream` 。首先,修改 `handle_connection` 的函数签名让测试更简单,之所以可以修改签名,原因在于 `async_std::net::TcpStream` 实际上并不是必须的,只要任何结构体实现了 `async_std::io::Read`, `async_std::io::Write``marker::Unpin` 就可以替代它:
```rust
use std::marker::Unpin;
use async_std::io::{Read, Write};
@ -230,7 +248,8 @@ use async_std::io::{Read, Write};
async fn handle_connection(mut stream: impl Read + Write + Unpin) {
```
下面来构建一个mock的 `TcpStream` 并实现了上面这些特征,它包含一些数据,这些数据将被拷贝到 `read` 缓存中, 然后返回 `Poll::Ready` 说明 `read` 已经结束:
下面,来构建一个 mock 的 `TcpStream` 并实现了上面这些特征,它包含一些数据,这些数据将被拷贝到 `read` 缓存中, 然后返回 `Poll::Ready` 说明 `read` 已经结束:
```rust
use super::*;
use futures::io::Error;
@ -257,7 +276,7 @@ impl Read for MockTcpStream {
}
```
`Write`的实现也类似,需要实现三个方法 : `poll_write`, `poll_flush`, 与 `poll_close``poll_write` 会拷贝输入数据到mock的 `TcpStream` 中,当完成后返回 `Poll::Ready`。由于 `TcpStream` 无需 `flush``close`,因此另两个方法直接返回 `Poll::Ready` 即可.
`Write`的实现也类似,需要实现三个方法 : `poll_write`, `poll_flush`, 与 `poll_close``poll_write` 会拷贝输入数据到 mock `TcpStream` 中,当完成后返回 `Poll::Ready`。由于 `TcpStream` 无需 `flush``close`,因此另两个方法直接返回 `Poll::Ready` 即可.
```rust
impl Write for MockTcpStream {
@ -281,7 +300,7 @@ impl Write for MockTcpStream {
}
```
最后我们的mock需要实现 `Unpin` 特征,表示它可以在内存中安全的移动,具体内容在[前面章节](./pin-unpin.md)有讲。
最后,我们的 mock 需要实现 `Unpin` 特征,表示它可以在内存中安全的移动,具体内容在[前面章节](./pin-unpin.md)有讲。
```rust
use std::marker::Unpin;

Loading…
Cancel
Save