update ch17-04

pull/851/head
KaiserY 2 weeks ago
parent 99d5cd1f2b
commit c6e53f739c

@ -239,7 +239,7 @@ Message: 'j'
我们以调用 `get_intervals` 作为开始。接着通过 `merge` 方法合并 `messages``intervals` 流,它将多个流合并为一个从任何一个来源流的项可用时返回项的流,并且不会保持任何特定顺序。最后循环遍历合并后的流而不是 `messages`
此时,`messages` 和 `intervals` 都不需要被 pin 住或是可变的,因为它们都会被合并进一个单一的 `merged` 流。然而,这个 `merge` 调用并不能编译!(`while let` 循环中的 `next` 调用也不行,稍后我们会回到这里。)这是因为两个流有着不同的类型。`messages` 流有着 `Timeout<impl Stream<Item = String>>` 类型,其中 `Timeout`一个在 `timeout` 调用上实现了 `Stream` 的类型。`intervals` 有着 `impl Stream<Item = u32>` 类型。为了合并这两个类型,我们需要将其中一个转换适配另一个。我们将重构 `intervals` 流,因为 `messages` 流已经有了我们期望的基本形态而且我们必须处理超时错误(如示例 17-38 所示)。
此时,`messages` 和 `intervals` 都不需要被 pin 住或是可变的,因为它们都会被合并进一个单一的 `merged` 流。然而,这个 `merge` 调用并不能编译!(`while let` 循环中的 `next` 调用也不行,稍后我们会回到这里。)这是因为两个流有着不同的类型。`messages` 流有着 `Timeout<impl Stream<Item = String>>` 类型,其中 `Timeout`在调用 `timeout`实现了 `Stream` 的类型。`intervals` 有着 `impl Stream<Item = u32>` 类型。为了合并这两个类型,我们需要将其中一个转换适配另一个。我们将重构 `intervals` 流,因为 `messages` 流已经有了我们期望的基本形态而且我们必须处理超时错误(如示例 17-38 所示)。
<figure class="listing">
@ -253,7 +253,7 @@ Message: 'j'
</figure>
首先,我们可以使用 `map` 帮助函数`intervals` 转换为字符串。再次,我们需要匹配 `messages` 中的 `Timeout`。但是因为我们不 **希望** `intervals` 有超时,我们可以直接创建一个超过其它超时的间隔。这里通过 `Duration::from_secs(10)` 创建了一个十秒的超时。最后我们需要将 `stream` 变为可变,这样 `while let` 循环的 `next` 调用可以遍历流,并且需要 pin 住它才能安全地执行。这 **几乎** 到了我们需要的地方。每一个类型都检查正确了。但是,如果你运行它,这会两个问题。第一,它永远也不会停止!你需要使用 <span class="keystroke">ctrl-c</span> 来停止它。第二,来自英文字母表的消息会淹没在所有的间隔计数消息的中间
首先,我们可以使用 `map` 辅助方法`intervals` 转换为字符串。再次,我们需要匹配 `messages` 中的 `Timeout`。但是因为我们不 **希望** `intervals` 有超时,因此可以直接创建一个比其他超时时长更长的超时。这里通过 `Duration::from_secs(10)` 创建了一个十秒的超时。最后我们需要将 `stream` 变为可变,这样 `while let` 循环的 `next` 调用可以遍历流,并且需要 pin 住它才能安全地执行。这 **几乎** 到了我们需要的地方。每一个类型都检查正确了。但是,如果你运行它,这会两个问题。第一,它永远也不会停止!你需要使用 <span class="keystroke">ctrl-c</span> 来停止它。第二,来自英文字母表的消息会淹没在所有的间隔计数消息之中
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the tasks running differently rather than
@ -285,5 +285,56 @@ Interval: 43
</figure>
首先,我们在 `intervals` 流上使用 `throttle` 方法以便其不会淹没 `messages`。**限流****Throttling**)是一种限制函数被调用速率的方式,或者在本例中是限制流被轮询的频率。每 100 毫秒一次较为合适。因为这大概是消息到达的间隔。
为了限制我们从流接收的项的数量,可以在 `merged` 流上调用 `take` 方法,因为我们希望限制最终输出,而不仅仅是两个流中的某一个。
现在当我们运行程序时,它在从流中轮询 20 个项后停止,同时间隔不会淹没消息。我们也不会看到 `Interval: 100``Interval: 200` 等信息,而是 `Interval: 1`、`Interval: 2` 等等,即便来源流**可以**每毫秒产生一个事件。这是因为 `throttle` 调用产生了一个封装了原始流的新流,这样原始流只会在限制速率下而不是其 “原生” 速率下轮询。我们不会有大量未处理的间隔消息来选择性地丢弃,我们最开始就从未产生这些间隔消息!这又是 Rust 的 future 所固有的 “惰性” 在起作用,它允许我们自主选择程序的性能特点。
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the threads running differently rather than
changes in the compiler -->
```text
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
```
还有最后一个需要处理的问题:错误!有了这两个基于信道的流,当信道的另一端关闭时 `send` 方法可能会失败,这取决于运行时如何执行组成流的 future。直到现在为止我们通过 `unwrap` 调用忽略了这种可能性。但在一个行为良好的应用程序中,我们应明确地处理该错误,至少应终止循环,以避免继续尝试发送消息。示例 17-40 展示了一个简单的错误处理策略:打印问题并从循环 `break` 出来。
<figure class="listing">
<span class="file-name">文件名src/main.rs</span>
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-40/src/main.rs:errors}}
```
<figcaption>示例 17-40处理错误并关闭循环</figcaption>
</figure>
同往常一样,正确处理消息发送失败的方式会有所不同:只要确保你有一个策略即可。
现在我们已经看过了很多异步实践,让我们稍作回顾,更深入地探讨一下 Rust 中用于实现异步的 `Future`、`Stream` 和其它关键 trait 的一些细节。
[17-02-messages]: ch17-02-concurrency-with-async.html#消息传递
[iterator-trait]: ch13-02-iterators.html#iterator-trait-和-next-方法

Loading…
Cancel
Save