add async/await

pull/278/head
sunface 3 years ago
parent 5411103994
commit 04d4b70fa2

@ -45,7 +45,7 @@
- [动态数组Vector](basic/collections/vector.md) - [动态数组Vector](basic/collections/vector.md)
- [KV存储HashMap](basic/collections/hashmap.md) - [KV存储HashMap](basic/collections/hashmap.md)
- [类型转换](basic/converse.md) - [类型转换](basic/converse.md)
- [返回和错误处理](basic/result-error/intro.md) - [返回和错误](basic/result-error/intro.md)
- [panic深入剖析!](basic/result-error/panic.md) - [panic深入剖析!](basic/result-error/panic.md)
- [返回值Result和?](basic/result-error/result.md) - [返回值Result和?](basic/result-error/result.md)
@ -79,14 +79,23 @@
- [线程同步锁、Condvar和信号量](advance/concurrency-with-threads/sync1.md) - [线程同步锁、Condvar和信号量](advance/concurrency-with-threads/sync1.md)
- [线程同步Atomic原子操作与内存顺序](advance/concurrency-with-threads/sync2.md) - [线程同步Atomic原子操作与内存顺序](advance/concurrency-with-threads/sync2.md)
- [基于Send和Sync的线程安全](advance/concurrency-with-threads/send-sync.md) - [基于Send和Sync的线程安全](advance/concurrency-with-threads/send-sync.md)
- [Unsafe Rust todo](advance/unsafe/intro.md)
- [原生指针 todo](advance/unsafe/raw-pointer.md)
- [FFI外部语言用 todo](advance/unsafe/ffi.md)
- [全局变量](advance/global-variable.md) - [全局变量](advance/global-variable.md)
<!-- - [高阶特征约束(HRTB) todo](advance/hrtb.md) --> - [错误处理 todo](advance/errors/intro.md)
- [简化错误处理 todo](advance/errors/simplify.md)
- [自定义错误 todo](advance/errors/user-define.md)
- [让错误输出更优雅 todo](advance/errors/pretty-format.md)
- [会导致panic的代码 todo](advance/errors/panic-codes.md)
- [高阶特征约束(HRTB) todo](advance/hrtb.md)
## 专题内容,每个专题都配套一个小型项目进行实践 ## 专题内容,每个专题都配套一个小型项目进行实践
- [async/await异步编程 doing](async/intro.md) - [async/await异步编程 doing](async/intro.md)
- [async编程入门](async/getting-started.md) - [async编程入门](async/getting-started.md)
- [底层探秘: Future执行与任务调度](async/future-excuting.md) - [底层探秘: Future执行与任务调度](async/future-excuting.md)
- [定海神针Pin和Unpin](async/pin-unpin.md) - [定海神针Pin和Unpin](async/pin-unpin.md)
- [async/await 和 Stream todo](async/async-await.md) - [async、await和Stream流处理](async/async-await.md)
- [一些棘手问题的解决 todo](async/hassles.md) - [一些棘手问题的解决 todo](async/hassles.md)
- [HTTP Client/Server todo](async/http.md) - [HTTP Client/Server todo](async/http.md)
- [tokio todo](async/tokio/intro.md) - [tokio todo](async/tokio/intro.md)
@ -135,15 +144,12 @@
- [用Rust增强Javascript todo](rustjs/intro.md) - [用Rust增强Javascript todo](rustjs/intro.md)
- [deno todo](rustjs/deno.md) - [deno todo](rustjs/deno.md)
- [wasm tod](rustjs/wasm/intro.md) - [wasm tod](rustjs/wasm/intro.md)
- [复杂错误索引 todo](errorindex/intro.md) - [复杂错误索引 todo](errorindex/intro.md)
- [所有权和借用 todo](errorindex/borrowing/intro.md) - [所有权和借用 todo](errorindex/borrowing/intro.md)
- [生命周期 todo](errorindex/lifetime/intro.md) - [生命周期 todo](errorindex/lifetime/intro.md)
- [错误处理 todo](errors/intro.md)
- [简化错误处理 todo](errors/simplify.md)
- [自定义错误 todo](errors/user-define.md)
- [让错误输出更优雅 todo](errors/pretty-format.md)
- [会导致panic的代码 todo](errors/panic-codes.md)
- [Cargo详解 todo](cargo/intro.md) - [Cargo详解 todo](cargo/intro.md)
- [常用命令 todo](cargo/commands.md) - [常用命令 todo](cargo/commands.md)
@ -187,11 +193,6 @@
- [为何OO todo](object-oriented/characteristics.md) - [为何OO todo](object-oriented/characteristics.md)
- [设计模式 todo](object-oriented/design-pattern.md) - [设计模式 todo](object-oriented/design-pattern.md)
- [不安全Rust todo](unsafe/intro.md)
- [原生指针 todo](unsafe/raw-pointer.md)
- [修改全局变量 todo](unsafe/modify-global-var.md)
- [FFI外部语言用 todo](unsafe/ffi.md)
- [那些会导致UB的代码 todo](unsafe/ub.md)
- [宏编程 todo](macro/intro.md) - [宏编程 todo](macro/intro.md)
- [过程宏(todo)](macro/procedure-macro.md) - [过程宏(todo)](macro/procedure-macro.md)

@ -1 +1,202 @@
# async/await语法 # async/await 和 Stream流处理
在入门章节中,我们简单学习了该如何使用 `async/.await` 同时在后面也了解了一些底层原理,现在是时候继续继续深入了。
`async/.await`是 Rust 语法的一部分,它在遇到阻塞操作时( 例如IO )会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
有两种方式可以使用`async` `async fn`用于声明函数,`async { ... }`用于声明语句块,它们会返回一个实现 `Future` 特征的值:
```rust
// `foo()`返回一个`Future<Output = u8>`,
// 当调用`foo().wait`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// 下面的`async`语句块返回`Future<Output = u8>`
async {
let x: u8 = foo().await;
x + 5
}
}
```
`async` 是懒惰的,直到被执行器 `poll` 或者 `.await` 后才会开始运行,其中后者是最常用的运行 `Future` 的方法。 当 `.await` 被调用时,它会尝试运行 `Future` 直到完成,但是若该 `Future` 进入阻塞,那就会让出当前线程的控制权。当 `Future` 后面准备再一次被运行时(例如从 `socket` 中读取到了数据),执行器会得到通知,并再次运行该 `Future` ,如此循环,直到完成。
以上过程只是一个简述,详细内容在[底层探秘](./future-excuting.md)中已经被深入讲解过,因此这里不再赘述。
## `async` 的生命周期
`async fn` 函数如果拥有引用类型的参数,那它返回的 `Future` 的生命周期就会被这些参数的生命周期所限制:
```rust
async fn foo(x: &u8) -> u8 { *x }
// 上面的函数跟下面的函数是等价的:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
```
意味着 `async fn` 函数返回的 `Future` 必须满足以下条件: 当 `x` 依然有效时, 该 `Future` 就必须继续等待( `.await` ), 也就是说`x` 必须比 `Future`活得更久。
在一般情况下,在函数调用后就立即 `.await` 不会存在任何问题,例如`foo(&x).await`。但是,若 `Future` 被先存起来或发送到另一个任务或者线程,就可能存在问题了:
```rust
use std::future::Future;
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) // ERROR: `x` does not live long enough
}
async fn borrow_x(x: &u8) -> u8 { *x }
```
以上代码会报错,因为 `x` 的生命周期只到 `bad` 函数的结尾。 但是 `Future` 显然会活得更久:
```shell
error[E0597]: `x` does not live long enough
--> src/main.rs:4:14
|
4 | borrow_x(&x) // ERROR: `x` does not live long enough
| ---------^^-
| | |
| | borrowed value does not live long enough
| argument requires that `x` is borrowed for `'static`
5 | }
| - `x` dropped here while still borrowed
```
其中一个常用的解决方法就是将具有引用参数的 `async fn` 函数转变成一个具有 `'static` 生命周期的 `Future` 。 以上解决方法可以通过将参数和对 `async fn` 的调用放在同一个 `async` 语句块来实现:
```rust
use std::future::Future;
async fn borrow_x(x: &u8) -> u8 { *x }
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}
```
如上所示,通过将参数移动到 `async` 语句块内, 我们将它的生命周期扩展到 `'static` 并跟返回的 `Future` 保持了一致。
## async move
`async` 允许我们使用 `move` 关键字来将环境中变量的所有权转移到语句块内,就像闭包那样,好处是你不再发愁该如何解决借用生命周期的问题,坏处就是无法跟其它代码实现对变量的共享:
```rust
// 多个不同的 `async` 语句块可以访问同一个本地变量,只要它们在该变量的作用域内执行
async fn blocks() {
let my_string = "foo".to_string();
let future_one = async {
// ...
println!("{my_string}");
};
let future_two = async {
// ...
println!("{my_string}");
};
// 运行两个 Future 直到完成
let ((), ()) = futures::join!(future_one, future_two);
}
// 由于`async move`会捕获环境中的变量,因此只有一个`async move`语句块可以访问该变量,
// 但是它也有非常明显的好处: 变量可以转移到返回的 Future 中,不再受借用生命周期的限制
fn move_block() -> impl Future<Output = ()> {
let my_string = "foo".to_string();
async move {
// ...
println!("{my_string}");
}
}
```
## 当.await遇见多线程执行器
需要注意的是,当使用多线程 `Future` 执行器( `executor` )时, `Future` 可能会在线程间被移动,因此 `async` 语句块中的变量必须要能在线程间传递。 至于 `Future` 会在线程间移动的原因是:它内部的任何`.await`都可能导致它被切换到一个新线程上去执行。
由于需要在多线程环境使用,意味着 `Rc``RefCell` 、没有实现 `Send` 的所有权类型、没有实现 `Sync` 的引用类型,它们都是不安全的,因此无法被使用
> 需要注意!实际上它们还是有可能被使用的,只要在 `.await` 调用期间,它们没有在作用域范围内。
类似的原因,在 `.await` 时使用普通的锁也不安全,例如 `Mutex` 。原因是,它可能会导致线程池被锁:当一个任务获取锁 `A` 后,若它将线程的控制权还给执行器,然后执行器又调度运行另一个任务,该任务也去尝试获取了锁 `A` ,结果当前线程会直接卡死,最终陷入死锁中。
因此,为了避免这种情况的发生,我们需要使用 `futures` 包下的锁 `futures::lock` 来替代 `Mutex` 完成任务。
## Stream流处理
`Stream` 特征类似于 `Future` 特征,但是前者可以在完成前可以生成多个值,这种行为跟标准库中的 `Iterator` 特征倒是颇为相似。
```rust
trait Stream {
// Stream生成的值的类型
type Item;
// 尝试去解析Stream中的下一个值,
// 若无数据,返回`Poll::Pending`, 若有数据,返回 `Poll::Ready(Some(x))`, `Stream`完成则返回 `Poll::Ready(None)`
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
```
关于 `Stream` 的一个常见例子是消息通道( `futures` 包中的)的消费者 `Receiver`。每次有消息从 `Send` 端发送后,它都可以接收到一个 `Some(val)` 值, 一旦 `Send` 端关闭(drop),且消息通道中没有消息后,它会接收到一个 `None` 值。
```rust
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
drop(tx);
// `StreamExt::next` 类似于 `Iterator::next`, 但是前者返回的不是值,而是一个 `Future<Output = Option<T>>`
// 因此还需要使用`.await`来获取具体的值
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
```
#### 迭代和并发
跟迭代器类似,我们也可以迭代一个 `Stream`。 例如使用`map``filter``fold`方法,以及它们的*遇到错误提前返回*的版本: `try_map``try_filter``try_fold`。
但是跟迭代器又有所不同,`for` 循环无法在这里使用,但是命令式风格的循环`while let`是可以用的,同时还可以使用`next` 和 `try_next` 方法:
```rust
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // 引入 next
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // 引入 try_next
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
```
上面代码是一次处理一个值的模式,但是需要注意的是:**如果你选择一次处理一个值的模式,可能会造成无法并发,这就失去了异步编程的意义**。 因此,如果可以的话我们还是要选择从一个 `Stream` 并发处理多个值的方式,通过 `for_each_concurrent``try_for_each_concurrent` 方法来实现:
```rust
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // 引入 `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
```

Loading…
Cancel
Save