|
|
|
@ -5,9 +5,9 @@
|
|
|
|
|
|
|
|
|
|
到本章的目前为止,我们大部分时间都专注于单个的 future 上。一个重要的例外就是我们用过的异步信道。回忆一下在本章之前的 [“消息传递”][17-02-messages] 中我们如何使用异步信道接收端的。异步 `recv` 方法随着时间的推移产生一个序列的项。这是一个更通用的模式的实例,通常被称为 *流*(*stream*)。
|
|
|
|
|
|
|
|
|
|
我们之前在第十三章的 [Iterator trait 和 `next` 方法][iterator-trait] 部分已经见过项的序列,不过迭代器和异步信道接收端有两个区别。第一个区别是时间维度:迭代器是同步的,而信道接收端是异步的。第二个区别是 API。当直接处理 `Iterator` 时,我们会调用其同步 `next` 方法。对于这个特定的 `trpl::Receiver` 流,我们调用一个异步的 `recv` 方法。除此之外,这两种 API 在使用上感觉十分相似,这种相似性并非巧合。流类似于一种异步形式的迭代器。不过鉴于 `trpl::Receiver` 专门等待接收消息,多用途的流 API 则更为通用:它像 `Iterator` 一样提供了下一个项,但采用异步的方式。
|
|
|
|
|
我们之前在第十三章的 [Iterator trait 和 `next` 方法][iterator-trait] 部分已经见过项的序列,不过迭代器和异步信道接收端有两个区别。第一个区别是时间维度:迭代器是同步的,而信道接收端是异步的。第二个区别是 API。当直接处理 `Iterator` 时,我们会调用其同步 `next` 方法。对于这个特定的 `trpl::Receiver` 流,我们调用一个异步的 `recv` 方法。除此之外,这两种 API 在使用上感觉十分相似,这种相似性并非巧合。流类似于一种异步形式的迭代器。然而, `trpl::Receiver` 专门等待接收消息,而通用的流 API 适用范围要广泛得多:它以与 `Iterator` 相同的方式一样提供下一个元素,但采用异步的方式。
|
|
|
|
|
|
|
|
|
|
Rust 中迭代器和流的相似性意味着我们实际上可以从任何迭代器上创建流。就迭代器而言,可以通过调用其 `next` 方法并 await 输出来使用流,如示例 17-30 所示。
|
|
|
|
|
Rust 中迭代器和流的相似性意味着我们实际上可以从任何迭代器上创建流。与迭代器类似,我们可以通过调用流的 `next` 方法并 await 其输出来使用它,如示例 17-30 所示。
|
|
|
|
|
|
|
|
|
|
<figure class="listing">
|
|
|
|
|
|
|
|
|
@ -57,13 +57,11 @@ help: there is a method `try_next` with a similar name
|
|
|
|
|
| ~~~~~~~~
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
正如输出中所建议的,编译器错误的原因是我们需要在作用域中有正确的 trait 以便能够使用 `next` 方法。鉴于目前为止的讨论,你可能会合理地推测是 `Stream`,但实际上需要的是 `StreamExt`。这里的 `Ext` 是 “extension”:在 Rust 社区中这是用另一个 trait 扩展 trait 的常见模式。
|
|
|
|
|
正如输出中所建议的,编译器错误的原因是我们需要在作用域中有正确的 trait 以便能够使用 `next` 方法。鉴于目前为止的讨论,你可能会合理地推测这个 trait 是 `Stream`,但实际上需要的是 `StreamExt`。这里的 `Ext` 是 “extension”:在 Rust 社区中这是用另一个 trait 扩展 trait 的常见模式。
|
|
|
|
|
|
|
|
|
|
我们稍后会在本章末尾更详细地介绍 `Stream` 和 `StreamExt` trait,目前你只需知道 `Stream` trait 定义了一个底层接口用于有效地组合 `Iterator` 和 `Future` trait。
|
|
|
|
|
我们稍后会在本章末尾更详细地介绍 `Stream` 和 `StreamExt` trait,目前你只需知道 `Stream` trait 定义了一个底层接口用于有效地组合 `Iterator` 和 `Future` trait。`StreamExt` trait 在 `Stream` 之上提供了一组高层 API,其中包括了 `next` 和其它类似于 `Iterator` trait 提供的工具方法。`Stream` 和 `StreamExt` 目前尚未被纳入 Rust 的标准库,但生态系统中的大多数 crate 都使用相同的定义。
|
|
|
|
|
|
|
|
|
|
为什么我们需要 `StreamExt` 而不是 `Stream`,而 `Stream` trait 本身又是做什么的呢?简单来说,答案是贯穿整个 Rust 生态系统,`Stream` trait 定义了一个底层接口用于有效地结合 `Iterator` 与 `Future` trait。`StreamExt` trait 在 `Stream` 之上提供了一组高层 API,其中包括了 `next` 和其它类似于 `Iterator` trait 提供的工具方法。`Stream` 和 `StreamExt` 目前尚未被纳入 Rust 的标准库,但大多数生态系统 crate 都使用相同的定义。
|
|
|
|
|
|
|
|
|
|
对编译器错误的修复是增加一个 `trpl::StreamExt` 的 `use` 语句,如示例 17-31 所示。
|
|
|
|
|
我们需要增加一个 `trpl::StreamExt` 的 `use` 语句来修复编译错误,如示例 17-31 所示。
|
|
|
|
|
|
|
|
|
|
<figure class="listing">
|
|
|
|
|
|
|
|
|
@ -77,7 +75,7 @@ help: there is a method `try_next` with a similar name
|
|
|
|
|
|
|
|
|
|
</figure>
|
|
|
|
|
|
|
|
|
|
将所有这些代码片段拼凑在一起,这段代码如我们预期般运行!更重要的是,现在我们将 `StreamExt` 引入了作用域,就可以像使用迭代器一样使用它的所有工具方法。例如在示例 17-32 中,我们使用 `filter` 方法来过滤出仅为 3 或 5 的倍数的项。
|
|
|
|
|
将所有这些片段拼凑在一起后,这段代码如我们预期般运行!更重要的是,现在我们引入了 `StreamExt` ,就可以像使用迭代器一样使用它的所有工具方法。例如在示例 17-32 中,我们使用 `filter` 方法来过滤出仅为 3 或 5 的倍数的项。
|
|
|
|
|
|
|
|
|
|
<figure class="listing">
|
|
|
|
|
|
|
|
|
@ -236,9 +234,9 @@ Message: 'j'
|
|
|
|
|
|
|
|
|
|
</figure>
|
|
|
|
|
|
|
|
|
|
我们以调用 `get_intervals` 作为开始。接着通过 `merge` 方法合并 `messages` 和 `intervals` 流,它将多个流合并为一个从任何一个来源流的项可用时返回项的流,并且不会保持任何特定顺序。最后循环遍历合并后的流而不是 `messages`。
|
|
|
|
|
我们以调用 `get_intervals` 作为开始。接着通过 `merge` 方法合并 `messages` 和 `intervals` 流,该方法将多个流合并为一个流,合并后的流会在源流中的任何一个流中出现新项时立即输出该项,且不会施加任何特定的排序顺序。最后循环遍历合并后的流而不是 `messages`。
|
|
|
|
|
|
|
|
|
|
此时,`messages` 和 `intervals` 都不需要被 pin 住或是可变的,因为它们都会被合并进一个单一的 `merged` 流。然而,这个 `merge` 调用并不能编译!(`while let` 循环中的 `next` 调用也不行,稍后我们会回到这里。)这是因为两个流有着不同的类型。`messages` 流有着 `Timeout<impl Stream<Item = String>>` 类型,其中 `Timeout` 是在调用 `timeout` 时实现了 `Stream` 的类型。`intervals` 有着 `impl Stream<Item = u32>` 类型。为了合并这两个类型,我们需要将其中一个流转换以适配另一个流。我们将重构 `intervals` 流,因为 `messages` 流已经有了我们期望的基本形态而且我们必须处理超时错误(如示例 17-38 所示)。
|
|
|
|
|
此时,`messages` 和 `intervals` 都不需要被 pin 住或是可变的,因为它们都会被合并进一个单一的 `merged` 流。然而,这个 `merge` 调用并不能编译!(`while let` 循环中的 `next` 调用也无法编译,这个问题我们稍后再处理。)这是因为两个流有着不同的类型。`messages` 流有着 `Timeout<impl Stream<Item = String>>` 类型,其中 `Timeout` 是在调用 `timeout` 时实现了 `Stream` 的类型。`intervals` 有着 `impl Stream<Item = u32>` 类型。为了合并这两个类型,我们需要将其中一个流转换为与另一个流匹配的类型。我们将重构 `intervals` 流,因为 `messages` 流已经有了我们期望的基本形态而且我们必须处理超时错误(如示例 17-38 所示)。
|
|
|
|
|
|
|
|
|
|
<figure class="listing">
|
|
|
|
|
|
|
|
|
@ -252,7 +250,7 @@ Message: 'j'
|
|
|
|
|
|
|
|
|
|
</figure>
|
|
|
|
|
|
|
|
|
|
首先,我们可以使用 `map` 辅助方法将 `intervals` 转换为字符串。再次,我们需要匹配 `messages` 中的 `Timeout`。但是因为我们不 **希望** `intervals` 有超时,因此可以直接创建一个比其他超时时长更长的超时。这里通过 `Duration::from_secs(10)` 创建了一个十秒的超时。最后我们需要将 `stream` 变为可变,这样 `while let` 循环的 `next` 调用可以遍历流,并且需要 pin 住它才能安全地执行。这 **几乎** 到了我们需要的地方。每一个类型都检查正确了。但是,如果你运行它,这会有两个问题。第一,它永远也不会停止!你需要使用 <span class="keystroke">ctrl-c</span> 来停止它。第二,来自英文字母表的消息会淹没在所有的间隔计数消息之中:
|
|
|
|
|
首先,我们可以使用 `map` 辅助方法将 `intervals` 转换为字符串。再次,我们需要匹配 `messages` 中的 `Timeout`。但是因为我们不 **希望** `intervals` 有超时,因此可以直接创建一个比其他超时时长更长的超时。这里通过 `Duration::from_secs(10)` 创建了一个十秒的超时。最后我们需要将 `stream` 变为可变,这样 `while let` 循环的 `next` 调用可以遍历流,并且需要 pin 住它才能安全地执行。这样我们 **几乎** 就达到了目标。所有类型检查都通过了。但是,如果你运行它,这会有两个问题。第一,它永远也不会停止!你需要使用 <span class="keystroke">ctrl-c</span> 来停止它。第二,来自英文字母表的消息会淹没在所有的间隔计数消息之中:
|
|
|
|
|
|
|
|
|
|
<!-- Not extracting output because changes to this output aren't significant;
|
|
|
|
|
the changes are likely to be due to the tasks running differently rather than
|
|
|
|
@ -284,11 +282,11 @@ Interval: 43
|
|
|
|
|
|
|
|
|
|
</figure>
|
|
|
|
|
|
|
|
|
|
首先,我们在 `intervals` 流上使用 `throttle` 方法以便其不会淹没 `messages`。**限流**(**Throttling**)是一种限制函数被调用速率的方式,或者在本例中是限制流被轮询的频率。每 100 毫秒一次较为合适。因为这大概是消息到达的间隔。
|
|
|
|
|
首先,我们在 `intervals` 流上使用 `throttle` 方法以防止其淹没 `messages`。**限流**(**Throttling**)是一种限制函数被调用频率的方式——在本例中,即限制流被轮询的频率。每 100 毫秒一次较为合适,因为这大致与消息到达的频率相当。
|
|
|
|
|
|
|
|
|
|
为了限制我们从流接收的项的数量,可以在 `merged` 流上调用 `take` 方法,因为我们希望限制最终输出,而不仅仅是两个流中的某一个。
|
|
|
|
|
|
|
|
|
|
现在当我们运行程序时,它在从流中轮询 20 个项后停止,同时间隔不会淹没消息。我们也不会看到 `Interval: 100` 或 `Interval: 200` 等信息,而是 `Interval: 1`、`Interval: 2` 等等,即便来源流**可以**每毫秒产生一个事件。这是因为 `throttle` 调用产生了一个封装了原始流的新流,这样原始流只会在限制速率下而不是其 “原生” 速率下轮询。我们不会有大量未处理的间隔消息来选择性地丢弃,我们最开始就从未产生这些间隔消息!这又是 Rust 的 future 所固有的 “惰性” 在起作用,它允许我们自主选择程序的性能特点。
|
|
|
|
|
现在当我们运行程序时,它在从流中轮询 20 个项后停止,同时间隔计数的消息不会淹没来自字母表的消息。我们也不会看到 `Interval: 100` 或 `Interval: 200` 等信息,而是 `Interval: 1`、`Interval: 2` 等等,即便来源流**可以**每毫秒产生一个事件。这是因为 `throttle` 调用产生了一个封装了原始流的新流,这样原始流只会在节流速率下而不是其 “原生” 速率下轮询。我们不会有大量未处理的间隔消息来选择性地丢弃,我们最开始就从未产生这些间隔消息!这又是 Rust 的 future 所固有的 “惰性” 在起作用,它允许我们自主选择程序的性能特点。
|
|
|
|
|
|
|
|
|
|
<!-- Not extracting output because changes to this output aren't significant;
|
|
|
|
|
the changes are likely to be due to the threads running differently rather than
|
|
|
|
|