pull/851/head
kazeno 3 weeks ago
parent 29c0ae40f9
commit c1007bf7dd

@ -114,12 +114,11 @@ help: there is a method `try_next` with a similar name
</figure>
首先,我们创建了一个返回 `impl Stream<Item = String>``get_messages` 函数。作为其实现,我们创建了一个异步信道,遍历前十个英文字母,并通过信道发送它们。
首先,我们创建了一个返回 `impl Stream<Item = String>``get_messages` 函数。作为其实现,我们创建了一个异步信道,循环遍历英文字母表的前 10 个字母,并通过信道发送它们。
我们还使用了一个新类型:`ReceiverStream`,它将 `trpl::channel``rx` 接收端转换为一个带有带有 `next` 方法的 `Stream`。回到 `main`,我们使用了一个 `while let` 循环来打印来自流中的所有消息。
当运行这段代码时,我们会得到正如我们期望的代码
运行这段代码时,我们将得到与预期完全一致的结果
<!-- Not extracting output because changes to this output aren't significant;
the changes are likely to be due to the threads running differently rather than
@ -152,7 +151,7 @@ Message: 'j'
</figure>
我们通过 `timeout` 方法在流上增加超时来作为开始,它来自 `StreamExt` trait。接着我们更新 `while let` 循环体,因为现在流返回一个 `Result`。`Ok` 变体表明消息及时到达;`Err` 变体表明任何消息到达前就触发超时了。我们 `match` 其结果要么在成功接收时打印消息要么打印一个超时的提示。最后,注意我们在加上超时之后 pin 了这些消息,因为超时助手函数产生了一个需要 pin 住才能拉取的流。
我们通过 `timeout` 方法在流上增加超时来作为开始,它来自 `StreamExt` trait。接着我们更新 `while let` 循环体,因为现在流返回一个 `Result`。`Ok` 变体表明消息及时到达;`Err` 变体表明任何消息到达前就触发超时了。我们 `match` 其结果要么在成功接收时打印消息要么打印一个超时的提示。最后,请注意我们在加上超时之后 pin 住了这些消息,因为超时辅助函数产生了一个需要 pin 住才能轮询的流。
然后,因为消息之间没有延时,超时并不会改变程序的行为。让我们为发送的消息增加一个延时变量,如示例 17-35 所示。
@ -168,13 +167,13 @@ Message: 'j'
</figure>
`get_messages` 中,我们在 `messages` 数组上使用 `enumerate` 迭代器方法以便能够一起获得项本身和其索引。然后我们在偶数索引的项引入 100 毫秒的延时并为奇数索引的项引入 300 毫秒的延时来模拟真实世界的消息流中可能见到的不同的延时。因为我们的延时为 200 毫秒左右,这应该会影响一半的消息。
`get_messages` 中,我们在 `messages` 数组上使用 `enumerate` 迭代器方法以便能够同时获得项本身和其索引。然后我们为偶数索引的项引入 100 毫秒的延时并为奇数索引的项引入 300 毫秒的延时来模拟真实世界的消息流中可能出现的不同的延时。因为我们的延时为 200 毫秒,这应该会影响到其中一半的消息。
为了在 `get_messages` 函数中的消息之前休眠而不阻塞,我们需要使用异步。然而,我们不能将 `get_messages` 函数本身变为异步函数,因为这样它会返回一个 `Future<Output = Stream<Item = String>>` 而不是 `Stream<Item = String>>`。调用者则不得不等待 `get_messages` 本身来获取流。不过请记住:在一个给定的 future 中的一切都是顺序发生的;并发发生在 futures **之间**。等待 `get_messages` 会要求其发送所有的消息,包括消息之间的休眠延时,在返回接收端流之前。其结果是,超时将毫无用处。流本身没有任何的延时;它们甚至全都发生在流可用之前。
为了在 `get_messages` 函数中实现消息间的延迟且不造成阻塞,我们需要使用异步。然而,我们不能将 `get_messages` 函数本身变为异步函数,因为这样它会返回一个 `Future<Output = Stream<Item = String>>` 而不是 `Stream<Item = String>>`。调用者则不得不 await `get_messages` 本身来获取流。不过请记住:在一个给定的 future 中的一切都是线性发生的;并发发生在 futures **之间**。await `get_messages` 会要求其在返回接收端流之前发送所有的消息,包括消息之间的休眠延时。其结果是,超时将毫无用处。流本身没有任何的延时;它们甚至全都发生在流可用之前。
相反,我们保持 `get_messages` 为一个返回流的常规函数,并产生一个任务来处理异步 `sleep` 调用。
相反,我们保持 `get_messages` 为一个返回流的常规函数,并 spawn 一个任务来处理异步 `sleep` 调用。
> 注意:像这样调用 `spawn_task` 可以工作是因为我们已经设置了运行时;如果没有,则会造成 panic。其它的实现则选择了不同的权衡取舍:它们可能会产生一个新的运行时来避免 panic 不过最终会有一些额外开销,或者它们可能简单地在没有运行时的引用的情况下不提供一个独立的方式来产生任务。请务必理解你的运行时所选择的取舍来编写相应的代码!
> 注意:像这样调用 `spawn_task` 可以工作是因为我们已经设置了运行时;如果没有,则会造成 panic。其它的实现则选择了不同的权衡策略:它们可能会产生一个新的运行时来避免 panic 不过最终会有一些额外开销,有的则可能根本不提供一种独立的、脱离运行时引用的方式来 spawn 任务。请务必理解你的运行时所选择的权衡策略来编写相应的代码!
现在我们的代码有了一个更为有趣的结果。每隔一对消息会有一个 `Problem: Elapsed(())` 错误。
@ -200,13 +199,43 @@ Problem: Elapsed(())
Message: 'j'
```
超时最终并不会阻止消息到达。我们仍然能够得到所有原始的消息,因为我们的信道是 **无限的****unbounded**):它可以存储内存所允许的所有消息。如果消息在超时之前没有到达,流处理器会做出反应,不过当再次拉取流时,消息现在可能已经到达了。
超时最终并不会阻止消息到达。我们仍然能够得到所有原始的消息,因为我们的信道是 **无限的****unbounded**):它可以存储内存所允许的所有消息。如果消息在超时之前没有到达,流处理器会做出相应处理,不过当再次轮询流时,消息现在可能已经到达了。
如果需要的话通过使用不同的信道或者其他更通用的流来得到不同行为。让我们看一个实际的通过结合一个时间间隔的流和这个消息流的例子。
如果需要的话通过使用不同的信道或者其他更通用的流来实现不同行为。让我们看一个实际的通过结合一个时间间隔的流和这个消息流的例子。
### 合并流
首先,让我们创建另一个流,如果直接运行它的话它会每毫秒发送一个项。
首先,让我们创建另一个流,如果直接运行它的话它会每毫秒发送一个项。为了简单考虑,我们可以使用 `sleep` 函数来延迟发送一个消息并采用与 `get_messages` 函数中从信道创建一个流一样的方式来合并它们。区别是这一次,我们要发送返回其经过的时间间隔,所以返回值类型将会是 `impl Stream<Item = u32>`,同时我们可以调用 `get_intervals` 函数(如示例 17-36 所示)。
<figure class="listing">
<span class="file-name">文件名src/main.rs</span>
```rust
{{#rustdoc_include ../listings/ch17-async-await/listing-17-36/src/main.rs:intervals}}
```
<figcaption>示例 17-36用一个会每毫秒触发一次的计数器来创建流</figcaption>
</figure>
我们以在任务中定义一个 `count` 作为开始。(我们也可以在任务外面定义它,不过限定任何变量的作用域会更明确。)接着我们创建一个无限循环。循环的每一次迭代会异步休眠一毫秒,递增计数器,并接着通过信道发送。因为这些全都封装在 `spawn_task` 创建的任务中,它们全部,包括无限循环,会随着运行时一并被清理。
这类在运行时被回收时才会结束的无限循环,在异步 Rust 中相当常见:很多程序需要无限地运行下去。通过异步编程,这不会阻塞任何其它内容,只要循环的每次迭代中有至少一个 await point。
现在回到 main 函数的异步代码块,我们可以尝试合并 `messages``intervals` 流,如示例 17-37 所示。
<figure class="listing">
<span class="file-name">文件名src/main.rs</span>
```rust,ignore,does_not_compile
{{#rustdoc_include ../listings/ch17-async-await/listing-17-37/src/main.rs:main}}
```
<figcaption>示例 17-37尝试合并 `messages``intervals`</figcaption>
</figure>
[17-02-messages]: ch17-02-concurrency-with-async.html#消息传递
[iterator-trait]: ch13-02-iterators.html#the-iterator-trait-and-the-next-method
[iterator-trait]: ch13-02-iterators.html#iterator-trait-和-next-方法

Loading…
Cancel
Save