You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

253 lines
9.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Stream
大家有没有想过, Rust 中的迭代器在迭代时能否异步进行?若不可以,是不是有相应的解决方案?
以上的问题其实很重要,因为在实际场景中,迭代一个集合,然后异步的去执行是很常见的需求,好在 Tokio 为我们提供了 `stream`,我们可以在异步函数中对其进行迭代,甚至和迭代器 `Iterator` 一样,`stream` 还能使用适配器,例如 `map` ! Tokio 在 [`StreamExt`](https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html) 特征上定义了常用的适配器。
要使用 `stream` ,目前还需要手动引入对应的包:
```rust
tokio-stream = "0.1"
```
> stream 没有放在 `tokio` 包的原因在于标准库中的 `Stream` 特征还没有稳定,一旦稳定后,`stream` 将移动到 `tokio` 中来
## 迭代
目前, Rust 语言还不支持异步的 `for` 循环,因此我们需要 `while let` 循环和 [`StreamExt::next()`](https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html#method.next) 一起使用来实现迭代的目的:
```rust
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let mut stream = tokio_stream::iter(&[1, 2, 3]);
while let Some(v) = stream.next().await {
println!("GOT = {:?}", v);
}
}
```
和迭代器 `Iterator` 类似,`next()` 方法返回一个 `Option<T>`,其中 `T` 是从 `stream` 中获取的值的类型。若收到 `None` 则意味着 `stream` 迭代已经结束。
#### mini-redis 广播
下面我们来实现一个复杂一些的 mini-redis 客户端,完整代码见[这里](https://github.com/tokio-rs/website/blob/master/tutorial-code/streams/src/main.rs)。
在开始之前,首先启动一下完整的 mini-redis 服务器端:
```console
$ mini-redis-server
```
```rust
use tokio_stream::StreamExt;
use mini_redis::client;
async fn publish() -> mini_redis::Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;
// 发布一些数据
client.publish("numbers", "1".into()).await?;
client.publish("numbers", "two".into()).await?;
client.publish("numbers", "3".into()).await?;
client.publish("numbers", "four".into()).await?;
client.publish("numbers", "five".into()).await?;
client.publish("numbers", "6".into()).await?;
Ok(())
}
async fn subscribe() -> mini_redis::Result<()> {
let client = client::connect("127.0.0.1:6379").await?;
let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber.into_stream();
tokio::pin!(messages);
while let Some(msg) = messages.next().await {
println!("got = {:?}", msg);
}
Ok(())
}
#[tokio::main]
async fn main() -> mini_redis::Result<()> {
tokio::spawn(async {
publish().await
});
subscribe().await?;
println!("DONE");
Ok(())
}
```
上面生成了一个异步任务专门用于发布消息到 min-redis 服务器端的 `numbers` 消息通道中。然后,在 `main` 中,我们订阅了 `numbers` 消息通道,并且打印从中接收到的消息。
还有几点值得注意的:
- [`into_stream`](https://docs.rs/mini-redis/0.4.1/mini_redis/client/struct.Subscriber.html#method.into_stream) 会将 `Subscriber` 变成一个 `stream`
-`stream` 上调用 `next` 方法要求该 `stream` 被固定住([`pinned`](https://doc.rust-lang.org/std/pin/index.html)),因此需要调用 `tokio::pin!`
> 关于 Pin 的详细解读,可以阅读[这篇文章](https://course.rs/async/pin-unpin.html)
大家可以去掉 `pin!` 的调用,然后观察下报错,若以后你遇到这种错误,可以尝试使用下 `pin!`
此时,可以运行下我们的客户端代码看看效果(别忘了先启动前面提到的 mini-redis 服务端):
```console
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })
```
在了解了 `stream` 的基本用法后,我们再来看看如何使用适配器来扩展它。
## 适配器
在前面章节中,我们了解了迭代器有[两种适配器](https://course.rs/advance/functional-programing/iterator.html#消费者与适配器)
- 迭代器适配器,会将一个迭代器转变成另一个迭代器,例如 `map``filter` 等
- 消费者适配器,会消费掉一个迭代器,最终生成一个值,例如 `collect` 可以将迭代器收集成一个集合
与迭代器类似,`stream` 也有适配器,例如一个 `stream` 适配器可以将一个 `stream` 转变成另一个 `stream` ,例如 `map`、`take` 和 `filter`
在之前的客户端中,`subscribe` 订阅一直持续下去,直到程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。
```rust
let messages = subscriber
.into_stream()
.take(3);
```
这里关键就在于 `take` 适配器,它会限制 `stream` 只能生成最多 `n` 条消息。运行下看看结果:
```console
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
```
程序终于可以正常结束了。现在,让我们过滤 `stream` 中的消息,只保留数字类型的值:
```rust
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.take(3);
```
运行后输出:
```console
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })
```
需要注意的是,适配器的顺序非常重要,`.filter(...).take(3)` 和 `.take(3).filter(...)` 的结果可能大相径庭,大家可以自己尝试下。
现在,还有一件事要做,咱们的消息被不太好看的 `Ok(...)` 所包裹,现在通过 `map` 适配器来简化下:
```rust
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.map(|msg| msg.unwrap().content)
.take(3);
```
注意到 `msg.unwrap` 了吗?大家可能会以为我们是出于示例的目的才这么用,实际上并不是,由于 `filter` 的先执行, `map` 中的 `msg` 只能是 `Ok(...)`,因此 `unwrap` 非常安全。
```console
got = b"1"
got = b"3"
got = b"6"
```
还有一点可以改进的地方:当 `filter``map` 一起使用时,你往往可以用一个统一的方法来实现 [`filter_map`](https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html#method.filter_map)。
想要学习更多的适配器,可以看看 [`StreamExt`](https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html) 特征。
## 实现 Stream 特征
如果大家还没忘记 `Future` 特征,那 `Stream` 特征相信你也会很快记住,因为它们非常类似:
```rust
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
```
`Stream::poll_next()` 函数跟 `Future::poll` 很相似,区别就是前者为了从 `stream` 收到多个值需要重复的进行调用。 就像在 [`深入async`](https://course.rs/tokio/async.html) 章节提到的那样,当一个 `stream` 没有做好返回一个值的准备时,它将返回一个 `Poll::Pending` ,同时将任务的 `waker` 进行注册。一旦 `stream` 准备好后, `waker` 将被调用。
通常来说,如果想要手动实现一个 `Stream`,需要组合 `Future` 和其它 `Stream`。下面,还记得在[`深入async`](https://course.rs/tokio/async.html) 中构建的 `Delay Future` 吗?现在让我们来更进一步,将它转换成一个 `stream`,每 10 毫秒生成一个值,总共生成 3 次:
```rust
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct Interval {
rem: usize,
delay: Delay,
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<()>>
{
if self.rem == 0 {
// 去除计时器实现
return Poll::Ready(None);
}
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
Poll::Pending => Poll::Pending,
}
}
}
```
#### async-stream
手动实现 `Stream` 特征实际上是相当麻烦的事不幸地是Rust 语言的 `async/await` 语法目前还不能用于定义 `stream`,虽然相关的工作已经在进行中。
作为替代方案,[`async-stream`](https://docs.rs/async-stream/latest/async_stream/) 包提供了一个 `stream!` 宏,它可以将一个输入转换成 `stream`,使用这个包,上面的代码可以这样实现:
```rust
use async_stream::stream;
use std::time::{Duration, Instant};
stream! {
let mut when = Instant::now();
for _ in 0..3 {
let delay = Delay { when };
delay.await;
yield ();
when += Duration::from_millis(10);
}
}
```
嗯,看上去还是相当不错的,代码可读性大幅提升!