diff --git a/src/ch17-04-streams.md b/src/ch17-04-streams.md index 98d070f..6a4efcc 100644 --- a/src/ch17-04-streams.md +++ b/src/ch17-04-streams.md @@ -239,7 +239,7 @@ Message: 'j' 我们以调用 `get_intervals` 作为开始。接着通过 `merge` 方法合并 `messages` 和 `intervals` 流,它将多个流合并为一个从任何一个来源流的项可用时返回项的流,并且不会保持任何特定顺序。最后循环遍历合并后的流而不是 `messages`。 -此时,`messages` 和 `intervals` 都不需要被 pin 住或是可变的,因为它们都会被合并进一个单一的 `merged` 流。然而,这个 `merge` 调用并不能编译!(`while let` 循环中的 `next` 调用也不行,稍后我们会回到这里。)这是因为两个流有着不同的类型。`messages` 流有着 `Timeout<impl Stream<Item = String>>` 类型,其中 `Timeout` 是一个在 `timeout` 调用上实现了 `Stream` 的类型。`intervals` 有着 `impl Stream<Item = u32>` 类型。为了合并这两个类型,我们需要将其中一个转换适配另一个。我们将重构 `intervals` 流,因为 `messages` 流已经有了我们期望的基本形态而且我们必须处理超时错误(如示例 17-38 所示)。 +此时,`messages` 和 `intervals` 都不需要被 pin 住或是可变的,因为它们都会被合并进一个单一的 `merged` 流。然而,这个 `merge` 调用并不能编译!(`while let` 循环中的 `next` 调用也不行,稍后我们会回到这里。)这是因为两个流有着不同的类型。`messages` 流有着 `Timeout<impl Stream<Item = String>>` 类型,其中 `Timeout` 是在调用 `timeout` 时实现了 `Stream` 的类型。`intervals` 有着 `impl Stream<Item = u32>` 类型。为了合并这两个类型,我们需要将其中一个流转换以适配另一个流。我们将重构 `intervals` 流,因为 `messages` 流已经有了我们期望的基本形态而且我们必须处理超时错误(如示例 17-38 所示)。 <figure class="listing"> @@ -253,7 +253,7 @@ Message: 'j' </figure> -首先,我们可以使用 `map` 帮助函数将 `intervals` 转换为字符串。再次,我们需要匹配 `messages` 中的 `Timeout`。但是因为我们不 **希望** `intervals` 有超时,我们可以直接创建一个超过其它超时的间隔。这里通过 `Duration::from_secs(10)` 创建了一个十秒的超时。最后我们需要将 `stream` 变为可变,这样 `while let` 循环的 `next` 调用可以遍历流,并且需要 pin 住它才能安全地执行。这 **几乎** 到了我们需要的地方。每一个类型都检查正确了。但是,如果你运行它,这会又两个问题。第一,它永远也不会停止!你需要使用 <span class="keystroke">ctrl-c</span> 来停止它。第二,来自英文字母表的消息会淹没在所有的间隔计数消息的中间: +首先,我们可以使用 `map` 辅助方法将 `intervals` 转换为字符串。再次,我们需要匹配 `messages` 中的 `Timeout`。但是因为我们不 **希望** `intervals` 有超时,因此可以直接创建一个比其他超时时长更长的超时。这里通过 `Duration::from_secs(10)` 创建了一个十秒的超时。最后我们需要将 `stream` 变为可变,这样 `while let` 循环的 `next` 调用可以遍历流,并且需要 pin 住它才能安全地执行。这 **几乎** 到了我们需要的地方。每一个类型都检查正确了。但是,如果你运行它,这会有两个问题。第一,它永远也不会停止!你需要使用 <span class="keystroke">ctrl-c</span> 来停止它。第二,来自英文字母表的消息会淹没在所有的间隔计数消息之中: <!-- Not extracting output because changes to this output aren't significant; the changes are likely to be due to the tasks running differently rather than @@ -285,5 +285,56 @@ Interval: 43 </figure> +首先,我们在 `intervals` 流上使用 `throttle` 方法以便其不会淹没 `messages`。**限流**(**Throttling**)是一种限制函数被调用速率的方式,或者在本例中是限制流被轮询的频率。每 100 毫秒一次较为合适。因为这大概是消息到达的间隔。 + +为了限制我们从流接收的项的数量,可以在 `merged` 流上调用 `take` 方法,因为我们希望限制最终输出,而不仅仅是两个流中的某一个。 + +现在当我们运行程序时,它在从流中轮询 20 个项后停止,同时间隔不会淹没消息。我们也不会看到 `Interval: 100` 或 `Interval: 200` 等信息,而是 `Interval: 1`、`Interval: 2` 等等,即便来源流**可以**每毫秒产生一个事件。这是因为 `throttle` 调用产生了一个封装了原始流的新流,这样原始流只会在限制速率下而不是其 “原生” 速率下轮询。我们不会有大量未处理的间隔消息来选择性地丢弃,我们最开始就从未产生这些间隔消息!这又是 Rust 的 future 所固有的 “惰性” 在起作用,它允许我们自主选择程序的性能特点。 + +<!-- Not extracting output because changes to this output aren't significant; +the changes are likely to be due to the threads running differently rather than +changes in the compiler --> + +```text +Interval: 1 +Message: 'a' +Interval: 2 +Interval: 3 +Problem: Elapsed(()) +Interval: 4 +Message: 'b' +Interval: 5 +Message: 'c' +Interval: 6 +Interval: 7 +Problem: Elapsed(()) +Interval: 8 +Message: 'd' +Interval: 9 +Message: 'e' +Interval: 10 +Interval: 11 +Problem: Elapsed(()) +Interval: 12 +``` + +还有最后一个需要处理的问题:错误!有了这两个基于信道的流,当信道的另一端关闭时 `send` 方法可能会失败,这取决于运行时如何执行组成流的 future。直到现在为止,我们通过 `unwrap` 调用忽略了这种可能性。但在一个行为良好的应用程序中,我们应明确地处理该错误,至少应终止循环,以避免继续尝试发送消息。示例 17-40 展示了一个简单的错误处理策略:打印问题并从循环 `break` 出来。 + +<figure class="listing"> + +<span class="file-name">文件名:src/main.rs</span> + +```rust +{{#rustdoc_include ../listings/ch17-async-await/listing-17-40/src/main.rs:errors}} +``` + +<figcaption>示例 17-40:处理错误并关闭循环</figcaption> + +</figure> + +同往常一样,正确处理消息发送失败的方式会有所不同:只要确保你有一个策略即可。 + +现在我们已经看过了很多异步实践,让我们稍作回顾,更深入地探讨一下 Rust 中用于实现异步的 `Future`、`Stream` 和其它关键 trait 的一些细节。 + [17-02-messages]: ch17-02-concurrency-with-async.html#消息传递 [iterator-trait]: ch13-02-iterators.html#iterator-trait-和-next-方法