You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

9.6 KiB

Stream

大家有没有想过, Rust 中的迭代器在迭代时能否异步进行?若不可以,是不是有相应的解决方案?

以上的问题其实很重要,因为在实际场景中,迭代一个集合,然后异步的去执行是很常见的需求,好在 Tokio 为我们提供了 stream,我们可以在异步函数中对其进行迭代,甚至和迭代器 Iterator 一样,stream 还能使用适配器,例如 map ! Tokio 在 StreamExt 特征上定义了常用的适配器。

要使用 stream ,目前还需要手动引入对应的包:

tokio-stream = "0.1"

stream 没有放在 tokio 包的原因在于标准库中的 Stream 特征还没有稳定,一旦稳定后,stream 将移动到 tokio 中来

迭代

目前, Rust 语言还不支持异步的 for 循环,因此我们需要 while let 循环和 StreamExt::next() 一起使用来实现迭代的目的:

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);
    
    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

和迭代器 Iterator 类似,next() 方法返回一个 Option<T>,其中 T 是从 stream 中获取的值的类型。若收到 None 则意味着 stream 迭代已经结束。

mini-redis 广播

下面我们来实现一个复杂一些的 mini-redis 客户端,完整代码见这里

在开始之前,首先启动一下完整的 mini-redis 服务器端:

$ mini-redis-server
use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // 发布一些数据
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("DONE");

    Ok(())
}

上面生成了一个异步任务专门用于发布消息到 min-redis 服务器端的 numbers 消息通道中。然后,在 main 中,我们订阅了 numbers 消息通道,并且打印从中接收到的消息。

还有几点值得注意的:

  • into_stream 会将 Subscriber 变成一个 stream
  • stream 上调用 next 方法要求该 stream 被固定住(pinned),因此需要调用 tokio::pin!

关于 Pin 的详细解读,可以阅读这篇文章

大家可以去掉 pin! 的调用,然后观察下报错,若以后你遇到这种错误,可以尝试使用下 pin!

此时,可以运行下我们的客户端代码看看效果(别忘了先启动前面提到的 mini-redis 服务端):

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

在了解了 stream 的基本用法后,我们再来看看如何使用适配器来扩展它。

适配器

在前面章节中,我们了解了迭代器有两种适配器

  • 迭代器适配器,会将一个迭代器转变成另一个迭代器,例如 mapfilter
  • 消费者适配器,会消费掉一个迭代器,最终生成一个值,例如 collect 可以将迭代器收集成一个集合

与迭代器类似,stream 也有适配器,例如一个 stream 适配器可以将一个 stream 转变成另一个 stream ,例如 maptakefilter

在之前的客户端中,subscribe 订阅一直持续下去,直到程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。

let messages = subscriber
    .into_stream()
    .take(3);

这里关键就在于 take 适配器,它会限制 stream 只能生成最多 n 条消息。运行下看看结果:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

程序终于可以正常结束了。现在,让我们过滤 stream 中的消息,只保留数字类型的值:

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);

运行后输出:

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

需要注意的是,适配器的顺序非常重要,.filter(...).take(3).take(3).filter(...) 的结果可能大相径庭,大家可以自己尝试下。

现在,还有一件事要做,咱们的消息被不太好看的 Ok(...) 所包裹,现在通过 map 适配器来简化下:

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

注意到 msg.unwrap 了吗?大家可能会以为我们是出于示例的目的才这么用,实际上并不是,由于 filter 的先执行, map 中的 msg 只能是 Ok(...),因此 unwrap 非常安全。

got = b"1"
got = b"3"
got = b"6"

还有一点可以改进的地方:当 filtermap 一起使用时,你往往可以用一个统一的方法来实现 filter_map

想要学习更多的适配器,可以看看 StreamExt 特征。

实现 Stream 特征

如果大家还没忘记 Future 特征,那 Stream 特征相信你也会很快记住,因为它们非常类似:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

Stream::poll_next() 函数跟 Future::poll 很相似,区别就是前者为了从 stream 收到多个值需要重复的进行调用。 就像在 深入async 章节提到的那样,当一个 stream 没有做好返回一个值的准备时,它将返回一个 Poll::Pending ,同时将任务的 waker 进行注册。一旦 stream 准备好后, waker 将被调用。

通常来说,如果想要手动实现一个 Stream,需要组合 Future 和其它 Stream。下面,还记得在深入async 中构建的 Delay Future 吗?现在让我们来更进一步,将它转换成一个 stream,每 10 毫秒生成一个值,总共生成 3 次:

use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // 去除计时器实现
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

async-stream

手动实现 Stream 特征实际上是相当麻烦的事不幸地是Rust 语言的 async/await 语法目前还不能用于定义 stream,虽然相关的工作已经在进行中。

作为替代方案,async-stream 包提供了一个 stream! 宏,它可以将一个输入转换成 stream,使用这个包,上面的代码可以这样实现:

use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
    let mut when = Instant::now();
    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}

嗯,看上去还是相当不错的,代码可读性大幅提升!