|
|
|
@ -2,7 +2,7 @@
|
|
|
|
|
|
|
|
|
|
> [ch17-04-streams.md](https://github.com/rust-lang/book/blob/main/src/ch17-04-streams.md)
|
|
|
|
|
> <br>
|
|
|
|
|
> commit c7edf19e58701f894b4d906a6f7bd738ad4de801
|
|
|
|
|
> commit 56ec353290429e6547109e88afea4de027b0f1a9
|
|
|
|
|
|
|
|
|
|
到本章的目前为止,我们大部分时间都专注于单个的 future 上。一个重要的例外就是我们用过的异步信道。回忆一下在本章之前的 [“消息传递”][17-02-messages] 中我们如何使用异步信道接收端的。异步 `recv` 方法随着时间的推移产生一个序列的项。这是一个更通用的模式的实例,通常被称为 *流*(*stream*)。
|
|
|
|
|
|
|
|
|
@ -201,11 +201,11 @@ Message: 'j'
|
|
|
|
|
|
|
|
|
|
超时最终并不会阻止消息到达。我们仍然能够得到所有原始的消息,因为我们的信道是 **无限的**(**unbounded**):它可以存储内存所允许的所有消息。如果消息在超时之前没有到达,流处理器会做出相应处理,不过当再次轮询流时,消息现在可能已经到达了。
|
|
|
|
|
|
|
|
|
|
如果需要的话通过使用不同的信道或者其他更通用的流来实现不同行为。让我们看一个实际的通过结合一个时间间隔的流和这个消息流的例子。
|
|
|
|
|
如果需要的话可以通过使用不同的信道或者其他更通用的流来实现不同行为。让我们看一个实际的将一个表示时间间隔的流和这个消息流合并的例子。
|
|
|
|
|
|
|
|
|
|
### 合并流
|
|
|
|
|
|
|
|
|
|
首先,让我们创建另一个流,如果直接运行它的话它会每毫秒发送一个项。为了简单考虑,我们可以使用 `sleep` 函数来延迟发送一个消息并采用与 `get_messages` 函数中从信道创建一个流一样的方式来合并它们。区别是这一次,我们要发送返回其经过的时间间隔,所以返回值类型将会是 `impl Stream<Item = u32>`,同时我们可以调用 `get_intervals` 函数(如示例 17-36 所示)。
|
|
|
|
|
首先,让我们创建另一个流,如果直接运行它的话它会每毫秒发送一个项。为了简单起见,我们可以使用 `sleep` 函数来延迟发送一个消息并采用与 `get_messages` 函数中从信道创建流时相同的方式来合并它们。区别是这一次,我们将发送已经过去的间隔次数,所以返回值类型将会是 `impl Stream<Item = u32>`,函数可以命名为 `get_intervals`(如示例 17-36 所示)。
|
|
|
|
|
|
|
|
|
|
<figure class="listing">
|
|
|
|
|
|
|
|
|
@ -219,7 +219,7 @@ Message: 'j'
|
|
|
|
|
|
|
|
|
|
</figure>
|
|
|
|
|
|
|
|
|
|
我们以在任务中定义一个 `count` 作为开始。(我们也可以在任务外面定义它,不过限定任何变量的作用域会更明确。)接着我们创建一个无限循环。循环的每一次迭代会异步休眠一毫秒,递增计数器,并接着通过信道发送。因为这些全都封装在 `spawn_task` 创建的任务中,它们全部,包括无限循环,会随着运行时一并被清理。
|
|
|
|
|
我们以在任务中定义一个 `count` 作为开始。(我们也可以在任务外面定义它,不过限定任何变量的作用域会更明确。)接着我们创建一个无限循环。循环的每一次迭代会异步休眠一毫秒,递增计数器,并接着通过信道发送该值。因为这些全都封装在 `spawn_task` 创建的任务中,因此它们(包括无限循环)都会随着运行时的销毁而被清理。
|
|
|
|
|
|
|
|
|
|
这类在运行时被回收时才会结束的无限循环,在异步 Rust 中相当常见:很多程序需要无限地运行下去。通过异步编程,这不会阻塞任何其它内容,只要循环的每次迭代中有至少一个 await point。
|
|
|
|
|
|
|
|
|
@ -237,5 +237,53 @@ Message: 'j'
|
|
|
|
|
|
|
|
|
|
</figure>
|
|
|
|
|
|
|
|
|
|
我们以调用 `get_intervals` 作为开始。接着通过 `merge` 方法合并 `messages` 和 `intervals` 流,它将多个流合并为一个从任何一个来源流的项可用时返回项的流,并且不会保持任何特定顺序。最后循环遍历合并后的流而不是 `messages`。
|
|
|
|
|
|
|
|
|
|
此时,`messages` 和 `intervals` 都不需要被 pin 住或是可变的,因为它们都会被合并进一个单一的 `merged` 流。然而,这个 `merge` 调用并不能编译!(`while let` 循环中的 `next` 调用也不行,稍后我们会回到这里。)这是因为两个流有着不同的类型。`messages` 流有着 `Timeout<impl Stream<Item = String>>` 类型,其中 `Timeout` 是一个在 `timeout` 调用上实现了 `Stream` 的类型。`intervals` 有着 `impl Stream<Item = u32>` 类型。为了合并这两个类型,我们需要将其中一个转换适配另一个。我们将重构 `intervals` 流,因为 `messages` 流已经有了我们期望的基本形态而且我们必须处理超时错误(如示例 17-38 所示)。
|
|
|
|
|
|
|
|
|
|
<figure class="listing">
|
|
|
|
|
|
|
|
|
|
<span class="file-name">文件名:src/main.rs</span>
|
|
|
|
|
|
|
|
|
|
```rust,ignore
|
|
|
|
|
{{#rustdoc_include ../listings/ch17-async-await/listing-17-38/src/main.rs:main}}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
<figcaption>示例 17-38:将 `intervals` 流的类型与 `messages` 流对齐</figcaption>
|
|
|
|
|
|
|
|
|
|
</figure>
|
|
|
|
|
|
|
|
|
|
首先,我们可以使用 `map` 帮助函数将 `intervals` 转换为字符串。再次,我们需要匹配 `messages` 中的 `Timeout`。但是因为我们不 **希望** `intervals` 有超时,我们可以直接创建一个超过其它超时的间隔。这里通过 `Duration::from_secs(10)` 创建了一个十秒的超时。最后我们需要将 `stream` 变为可变,这样 `while let` 循环的 `next` 调用可以遍历流,并且需要 pin 住它才能安全地执行。这 **几乎** 到了我们需要的地方。每一个类型都检查正确了。但是,如果你运行它,这会又两个问题。第一,它永远也不会停止!你需要使用 <span class="keystroke">ctrl-c</span> 来停止它。第二,来自英文字母表的消息会淹没在所有的间隔计数消息的中间:
|
|
|
|
|
|
|
|
|
|
<!-- Not extracting output because changes to this output aren't significant;
|
|
|
|
|
the changes are likely to be due to the tasks running differently rather than
|
|
|
|
|
changes in the compiler -->
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
--snip--
|
|
|
|
|
Interval: 38
|
|
|
|
|
Interval: 39
|
|
|
|
|
Interval: 40
|
|
|
|
|
Message: 'a'
|
|
|
|
|
Interval: 41
|
|
|
|
|
Interval: 42
|
|
|
|
|
Interval: 43
|
|
|
|
|
--snip--
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
示例 17-39 展示了一种解决最后两个问题的方法。
|
|
|
|
|
|
|
|
|
|
<figure class="listing">
|
|
|
|
|
|
|
|
|
|
<span class="file-name">文件名:src/main.rs</span>
|
|
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
{{#rustdoc_include ../listings/ch17-async-await/listing-17-39/src/main.rs:throttle}}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
<figcaption>示例 17-39:使用 `throttle` and `take` 来处理合并后的流</figcaption>
|
|
|
|
|
|
|
|
|
|
</figure>
|
|
|
|
|
|
|
|
|
|
[17-02-messages]: ch17-02-concurrency-with-async.html#消息传递
|
|
|
|
|
[iterator-trait]: ch13-02-iterators.html#iterator-trait-和-next-方法
|
|
|
|
|