|
|
|
|
<!DOCTYPE HTML>
|
|
|
|
|
<html lang="zh-CN" class="light" dir="ltr">
|
|
|
|
|
<head>
|
|
|
|
|
<!-- Book generated using mdBook -->
|
|
|
|
|
<meta charset="UTF-8">
|
|
|
|
|
<title>类似迭代器的 Stream - Rust语言圣经(Rust Course)</title>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<!-- Custom HTML head -->
|
|
|
|
|
|
|
|
|
|
<meta name="description" content="">
|
|
|
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
|
|
|
<meta name="theme-color" content="#ffffff">
|
|
|
|
|
|
|
|
|
|
<link rel="icon" href="../favicon.svg">
|
|
|
|
|
<link rel="shortcut icon" href="../favicon.png">
|
|
|
|
|
<link rel="stylesheet" href="../css/variables.css">
|
|
|
|
|
<link rel="stylesheet" href="../css/general.css">
|
|
|
|
|
<link rel="stylesheet" href="../css/chrome.css">
|
|
|
|
|
<link rel="stylesheet" href="../css/print.css" media="print">
|
|
|
|
|
|
|
|
|
|
<!-- Fonts -->
|
|
|
|
|
<link rel="stylesheet" href="../FontAwesome/css/font-awesome.css">
|
|
|
|
|
<link rel="stylesheet" href="../fonts/fonts.css">
|
|
|
|
|
|
|
|
|
|
<!-- Highlight.js Stylesheets -->
|
|
|
|
|
<link rel="stylesheet" href="../highlight.css">
|
|
|
|
|
<link rel="stylesheet" href="../tomorrow-night.css">
|
|
|
|
|
<link rel="stylesheet" href="../ayu-highlight.css">
|
|
|
|
|
|
|
|
|
|
<!-- Custom theme stylesheets -->
|
|
|
|
|
<link rel="stylesheet" href="../theme/style.css">
|
|
|
|
|
|
|
|
|
|
</head>
|
|
|
|
|
<body class="sidebar-visible no-js">
|
|
|
|
|
<div id="body-container">
|
|
|
|
|
<!-- Provide site root to javascript -->
|
|
|
|
|
<script>
|
|
|
|
|
var path_to_root = "../";
|
|
|
|
|
var default_theme = window.matchMedia("(prefers-color-scheme: dark)").matches ? "navy" : "light";
|
|
|
|
|
</script>
|
|
|
|
|
|
|
|
|
|
<!-- Work around some values being stored in localStorage wrapped in quotes -->
|
|
|
|
|
<script>
|
|
|
|
|
try {
|
|
|
|
|
var theme = localStorage.getItem('mdbook-theme');
|
|
|
|
|
var sidebar = localStorage.getItem('mdbook-sidebar');
|
|
|
|
|
|
|
|
|
|
if (theme.startsWith('"') && theme.endsWith('"')) {
|
|
|
|
|
localStorage.setItem('mdbook-theme', theme.slice(1, theme.length - 1));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (sidebar.startsWith('"') && sidebar.endsWith('"')) {
|
|
|
|
|
localStorage.setItem('mdbook-sidebar', sidebar.slice(1, sidebar.length - 1));
|
|
|
|
|
}
|
|
|
|
|
} catch (e) { }
|
|
|
|
|
</script>
|
|
|
|
|
|
|
|
|
|
<!-- Set the theme before any content is loaded, prevents flash -->
|
|
|
|
|
<script>
|
|
|
|
|
var theme;
|
|
|
|
|
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
|
|
|
|
|
if (theme === null || theme === undefined) { theme = default_theme; }
|
|
|
|
|
var html = document.querySelector('html');
|
|
|
|
|
html.classList.remove('light')
|
|
|
|
|
html.classList.add(theme);
|
|
|
|
|
var body = document.querySelector('body');
|
|
|
|
|
body.classList.remove('no-js')
|
|
|
|
|
body.classList.add('js');
|
|
|
|
|
</script>
|
|
|
|
|
|
|
|
|
|
<input type="checkbox" id="sidebar-toggle-anchor" class="hidden">
|
|
|
|
|
|
|
|
|
|
<!-- Hide / unhide sidebar before it is displayed -->
|
|
|
|
|
<script>
|
|
|
|
|
var body = document.querySelector('body');
|
|
|
|
|
var sidebar = null;
|
|
|
|
|
var sidebar_toggle = document.getElementById("sidebar-toggle-anchor");
|
|
|
|
|
if (document.body.clientWidth >= 1080) {
|
|
|
|
|
try { sidebar = localStorage.getItem('mdbook-sidebar'); } catch(e) { }
|
|
|
|
|
sidebar = sidebar || 'visible';
|
|
|
|
|
} else {
|
|
|
|
|
sidebar = 'hidden';
|
|
|
|
|
}
|
|
|
|
|
sidebar_toggle.checked = sidebar === 'visible';
|
|
|
|
|
body.classList.remove('sidebar-visible');
|
|
|
|
|
body.classList.add("sidebar-" + sidebar);
|
|
|
|
|
</script>
|
|
|
|
|
|
|
|
|
|
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
|
|
|
|
|
<div class="sidebar-scrollbox">
|
|
|
|
|
<ol class="chapter"><li class="chapter-item affix "><a href="../about-book.html">关于本书</a></li><li class="chapter-item affix "><a href="../into-rust.html">进入 Rust 编程世界</a></li><li class="chapter-item affix "><a href="../first-try/sth-you-should-not-do.html">避免从入门到放弃</a></li><li class="chapter-item affix "><a href="../community.html">社区和锈书</a></li><li class="spacer"></li><li class="chapter-item affix "><a href="../some-thoughts.html">Xobserve: 一切皆可观测</a></li><li class="chapter-item affix "><a href="../beat-ai.html">BeatAI: 工程师 AI 入门圣经</a></li><li class="chapter-item affix "><li class="part-title">Rust 语言基础学习</li><li class="spacer"></li><li class="chapter-item "><a href="../first-try/intro.html"><strong aria-hidden="true">1.</strong> 寻找牛刀,以便小试</a><a class="toggle"><div>❱</div></a></li><li><ol class="section"><li class="chapter-item "><a href="../first-try/installation.html"><strong aria-hidden="true">1.1.</strong> 安装 Rust 环境</a></li><li class="chapter-item "><a href="../first-try/editor.html"><strong aria-hidden="true">1.2.</strong> 墙推 VSCode!</a></li><li class="chapter-item "><a href="../first-try/cargo.html"><strong aria-hidden="true">1.3.</strong> 认识 Cargo</a></li><li class="chapter-item "><a href="../first-try/hello-world.html"><strong aria-hidden="true">1.4.</strong> 不仅仅是 Hello world</a></li><li class="chapter-item "><a href="../first-try/slowly-downloading.html"><strong aria-hidden="true">1.5.</strong> 下载依赖太慢了?</a></li></ol></li><li class="chapter-item "><a href="../basic/intro.html"><strong aria-hidden="true">2.</strong> Rust 基础入门</a><a class="toggle"><div>❱</div></a></li><li><ol class="section"><li class="chapter-item "><a href="../basic/variable.html"><strong aria-hidden="true">2.1.</strong> 变量绑定与解构</a></li><li class="chapter-item "><a href="../basic/base-type/index.html"><strong aria-hidden="true">2.2.</strong> 基本类型</a><a class="toggle"><div>❱</div></a></li><li><ol class="section"><li class="chapter-item "><a href="../basic/base-type/numbers.html"><strong aria-hidden="true">2.2.1.</strong> 数值类型</a></li><li class="chapter-item "><a href="../basic/base-type/char-bool.html"><strong aria-hidden="true">2.2.2.</strong> 字符、布尔、单元类型</a></li><li class="chapter-item "><a href="../basic/base-type/statement-expression.html"><strong aria-hidden="true">2.2.3.</strong> 语句与表达式</a></li><li class="chapter-item "><a href="../basic/base-type/function.html"><strong aria-hidden="true">2.2.4.</strong> 函数</a></li></ol></li><li class="chapter-item "><a href="../basic/ownership/index.html"><strong aria-hidden="true">2.3.</strong> 所有权和借用</a><a class="toggle"><div>❱</div></a></li><li><ol class="section"><li class="chapter-item "><a href="../basic/ownership/ownership.html"><strong aria-hidden="true">2.3.1.</strong> 所有权</a></li><li class="chapter-item "><a href="../basic/ownership/borrowing.html"><strong aria-hidden="true">2.3.2.</strong> 引用与借用</a></li></ol></li><li class="chapter-item "><a href="../basic/compound-type/intro.html"><strong aria-hidden="true">2.4.</strong> 复合类型</a><a class="toggle"><div>❱</div></a></li><li><ol class="section"><li class="chapter-item "><a href="../basic/compound-type/string-slice.html"><strong aria-hidden="true">2.4.1.</strong> 字符串与切片</a></li><li class="chapter-item "><a href="../basic/compound-type/tuple.html"><strong aria-hidden="true">2.4.2.</strong> 元组</a></li><li class="chapter-item "><a href="../basic/compound-type/struct.html"><strong aria-hidden="true">2.4.3.</strong> 结构体</a></li><li class="chapter-item "><a href="../basic/compound-type/enum.html"><strong aria-hidden="true">2.4.4.</strong> 枚举</a></li><li class="chapter-item "><a href="../basic/compound-type/array.html"><strong aria-hidden="true">2.4.5.</strong> 数组</a></li></ol></li><li class="chapter-item "><a href="../basic/flow-control.html"><strong a
|
|
|
|
|
</div>
|
|
|
|
|
<div id="sidebar-resize-handle" class="sidebar-resize-handle">
|
|
|
|
|
<div class="sidebar-resize-indicator"></div>
|
|
|
|
|
</div>
|
|
|
|
|
</nav>
|
|
|
|
|
|
|
|
|
|
<!-- Track and set sidebar scroll position -->
|
|
|
|
|
<script>
|
|
|
|
|
var sidebarScrollbox = document.querySelector('#sidebar .sidebar-scrollbox');
|
|
|
|
|
sidebarScrollbox.addEventListener('click', function(e) {
|
|
|
|
|
if (e.target.tagName === 'A') {
|
|
|
|
|
sessionStorage.setItem('sidebar-scroll', sidebarScrollbox.scrollTop);
|
|
|
|
|
}
|
|
|
|
|
}, { passive: true });
|
|
|
|
|
var sidebarScrollTop = sessionStorage.getItem('sidebar-scroll');
|
|
|
|
|
sessionStorage.removeItem('sidebar-scroll');
|
|
|
|
|
if (sidebarScrollTop) {
|
|
|
|
|
// preserve sidebar scroll position when navigating via links within sidebar
|
|
|
|
|
sidebarScrollbox.scrollTop = sidebarScrollTop;
|
|
|
|
|
} else {
|
|
|
|
|
// scroll sidebar to current active section when navigating via "next/previous chapter" buttons
|
|
|
|
|
var activeSection = document.querySelector('#sidebar .active');
|
|
|
|
|
if (activeSection) {
|
|
|
|
|
activeSection.scrollIntoView({ block: 'center' });
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
</script>
|
|
|
|
|
|
|
|
|
|
<div id="page-wrapper" class="page-wrapper">
|
|
|
|
|
|
|
|
|
|
<div class="page">
|
|
|
|
|
<div id="menu-bar-hover-placeholder"></div>
|
|
|
|
|
<div id="menu-bar" class="menu-bar sticky">
|
|
|
|
|
<div class="left-buttons">
|
|
|
|
|
<label id="sidebar-toggle" class="icon-button" for="sidebar-toggle-anchor" title="Toggle Table of Contents" aria-label="Toggle Table of Contents" aria-controls="sidebar">
|
|
|
|
|
<i class="fa fa-bars"></i>
|
|
|
|
|
</label>
|
|
|
|
|
<button id="theme-toggle" class="icon-button" type="button" title="Change theme" aria-label="Change theme" aria-haspopup="true" aria-expanded="false" aria-controls="theme-list">
|
|
|
|
|
<i class="fa fa-paint-brush"></i>
|
|
|
|
|
</button>
|
|
|
|
|
<ul id="theme-list" class="theme-popup" aria-label="Themes" role="menu">
|
|
|
|
|
<li role="none"><button role="menuitem" class="theme" id="light">Light</button></li>
|
|
|
|
|
<li role="none"><button role="menuitem" class="theme" id="rust">Rust</button></li>
|
|
|
|
|
<li role="none"><button role="menuitem" class="theme" id="coal">Coal</button></li>
|
|
|
|
|
<li role="none"><button role="menuitem" class="theme" id="navy">Navy</button></li>
|
|
|
|
|
<li role="none"><button role="menuitem" class="theme" id="ayu">Ayu</button></li>
|
|
|
|
|
</ul>
|
|
|
|
|
<button id="search-toggle" class="icon-button" type="button" title="Search. (Shortkey: s)" aria-label="Toggle Searchbar" aria-expanded="false" aria-keyshortcuts="S" aria-controls="searchbar">
|
|
|
|
|
<i class="fa fa-search"></i>
|
|
|
|
|
</button>
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<h1 class="menu-title">Rust语言圣经(Rust Course)</h1>
|
|
|
|
|
|
|
|
|
|
<div class="right-buttons">
|
|
|
|
|
<a href="../print.html" title="Print this book" aria-label="Print this book">
|
|
|
|
|
<i id="print-button" class="fa fa-print"></i>
|
|
|
|
|
</a>
|
|
|
|
|
<a href="https://github.com/sunface/rust-course" title="Git repository" aria-label="Git repository">
|
|
|
|
|
<i id="git-repository-button" class="fa fa-github"></i>
|
|
|
|
|
</a>
|
|
|
|
|
<a href="https://github.com/sunface/rust-course/edit/main/src/advance-practice/stream.md" title="Suggest an edit" aria-label="Suggest an edit">
|
|
|
|
|
<i id="git-edit-button" class="fa fa-edit"></i>
|
|
|
|
|
</a>
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<div id="search-wrapper" class="hidden">
|
|
|
|
|
<form id="searchbar-outer" class="searchbar-outer">
|
|
|
|
|
<input type="search" id="searchbar" name="searchbar" placeholder="Search this book ..." aria-controls="searchresults-outer" aria-describedby="searchresults-header">
|
|
|
|
|
</form>
|
|
|
|
|
<div id="searchresults-outer" class="searchresults-outer hidden">
|
|
|
|
|
<div id="searchresults-header" class="searchresults-header"></div>
|
|
|
|
|
<ul id="searchresults">
|
|
|
|
|
</ul>
|
|
|
|
|
</div>
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<!-- Apply ARIA attributes after the sidebar and the sidebar toggle button are added to the DOM -->
|
|
|
|
|
<script>
|
|
|
|
|
document.getElementById('sidebar-toggle').setAttribute('aria-expanded', sidebar === 'visible');
|
|
|
|
|
document.getElementById('sidebar').setAttribute('aria-hidden', sidebar !== 'visible');
|
|
|
|
|
Array.from(document.querySelectorAll('#sidebar a')).forEach(function(link) {
|
|
|
|
|
link.setAttribute('tabIndex', sidebar === 'visible' ? 0 : -1);
|
|
|
|
|
});
|
|
|
|
|
</script>
|
|
|
|
|
|
|
|
|
|
<div id="content" class="content">
|
|
|
|
|
<!-- Page table of contents -->
|
|
|
|
|
<div class="sidetoc"><nav class="pagetoc"></nav></div>
|
|
|
|
|
<main>
|
|
|
|
|
<h1 id="stream"><a class="header" href="#stream">Stream</a></h1>
|
|
|
|
|
<p>大家有没有想过, Rust 中的迭代器在迭代时能否异步进行?若不可以,是不是有相应的解决方案?</p>
|
|
|
|
|
<p>以上的问题其实很重要,因为在实际场景中,迭代一个集合,然后异步的去执行是很常见的需求,好在 Tokio 为我们提供了 <code>stream</code>,我们可以在异步函数中对其进行迭代,甚至和迭代器 <code>Iterator</code> 一样,<code>stream</code> 还能使用适配器,例如 <code>map</code> ! Tokio 在 <a href="https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html"><code>StreamExt</code></a> 特征上定义了常用的适配器。</p>
|
|
|
|
|
<p>要使用 <code>stream</code> ,目前还需要手动引入对应的包:</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021"><span class="boring">#![allow(unused)]
|
|
|
|
|
</span><span class="boring">fn main() {
|
|
|
|
|
</span>tokio-stream = "0.1"
|
|
|
|
|
<span class="boring">}</span></code></pre></pre>
|
|
|
|
|
<blockquote>
|
|
|
|
|
<p>stream 没有放在 <code>tokio</code> 包的原因在于标准库中的 <code>Stream</code> 特征还没有稳定,一旦稳定后,<code>stream</code> 将移动到 <code>tokio</code> 中来</p>
|
|
|
|
|
</blockquote>
|
|
|
|
|
<h2 id="迭代"><a class="header" href="#迭代">迭代</a></h2>
|
|
|
|
|
<p>目前, Rust 语言还不支持异步的 <code>for</code> 循环,因此我们需要 <code>while let</code> 循环和 <a href="https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html#method.next"><code>StreamExt::next()</code></a> 一起使用来实现迭代的目的:</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021">use tokio_stream::StreamExt;
|
|
|
|
|
|
|
|
|
|
#[tokio::main]
|
|
|
|
|
async fn main() {
|
|
|
|
|
let mut stream = tokio_stream::iter(&[1, 2, 3]);
|
|
|
|
|
|
|
|
|
|
while let Some(v) = stream.next().await {
|
|
|
|
|
println!("GOT = {:?}", v);
|
|
|
|
|
}
|
|
|
|
|
}</code></pre></pre>
|
|
|
|
|
<p>和迭代器 <code>Iterator</code> 类似,<code>next()</code> 方法返回一个 <code>Option<T></code>,其中 <code>T</code> 是从 <code>stream</code> 中获取的值的类型。若收到 <code>None</code> 则意味着 <code>stream</code> 迭代已经结束。</p>
|
|
|
|
|
<h4 id="mini-redis-广播"><a class="header" href="#mini-redis-广播">mini-redis 广播</a></h4>
|
|
|
|
|
<p>下面我们来实现一个复杂一些的 mini-redis 客户端,完整代码见<a href="https://github.com/tokio-rs/website/blob/master/tutorial-code/streams/src/main.rs">这里</a>。</p>
|
|
|
|
|
<p>在开始之前,首先启动一下完整的 mini-redis 服务器端:</p>
|
|
|
|
|
<pre><code class="language-console">$ mini-redis-server
|
|
|
|
|
</code></pre>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021">use tokio_stream::StreamExt;
|
|
|
|
|
use mini_redis::client;
|
|
|
|
|
|
|
|
|
|
async fn publish() -> mini_redis::Result<()> {
|
|
|
|
|
let mut client = client::connect("127.0.0.1:6379").await?;
|
|
|
|
|
|
|
|
|
|
// 发布一些数据
|
|
|
|
|
client.publish("numbers", "1".into()).await?;
|
|
|
|
|
client.publish("numbers", "two".into()).await?;
|
|
|
|
|
client.publish("numbers", "3".into()).await?;
|
|
|
|
|
client.publish("numbers", "four".into()).await?;
|
|
|
|
|
client.publish("numbers", "five".into()).await?;
|
|
|
|
|
client.publish("numbers", "6".into()).await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn subscribe() -> mini_redis::Result<()> {
|
|
|
|
|
let client = client::connect("127.0.0.1:6379").await?;
|
|
|
|
|
let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
|
|
|
|
|
let messages = subscriber.into_stream();
|
|
|
|
|
|
|
|
|
|
tokio::pin!(messages);
|
|
|
|
|
|
|
|
|
|
while let Some(msg) = messages.next().await {
|
|
|
|
|
println!("got = {:?}", msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::main]
|
|
|
|
|
async fn main() -> mini_redis::Result<()> {
|
|
|
|
|
tokio::spawn(async {
|
|
|
|
|
publish().await
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
subscribe().await?;
|
|
|
|
|
|
|
|
|
|
println!("DONE");
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}</code></pre></pre>
|
|
|
|
|
<p>上面生成了一个异步任务专门用于发布消息到 min-redis 服务器端的 <code>numbers</code> 消息通道中。然后,在 <code>main</code> 中,我们订阅了 <code>numbers</code> 消息通道,并且打印从中接收到的消息。</p>
|
|
|
|
|
<p>还有几点值得注意的:</p>
|
|
|
|
|
<ul>
|
|
|
|
|
<li><a href="https://docs.rs/mini-redis/0.4.1/mini_redis/client/struct.Subscriber.html#method.into_stream"><code>into_stream</code></a> 会将 <code>Subscriber</code> 变成一个 <code>stream</code></li>
|
|
|
|
|
<li>在 <code>stream</code> 上调用 <code>next</code> 方法要求该 <code>stream</code> 被固定住(<a href="https://doc.rust-lang.org/std/pin/index.html"><code>pinned</code></a>),因此需要调用 <code>tokio::pin!</code></li>
|
|
|
|
|
</ul>
|
|
|
|
|
<blockquote>
|
|
|
|
|
<p>关于 Pin 的详细解读,可以阅读<a href="https://course.rs/async/pin-unpin.html">这篇文章</a></p>
|
|
|
|
|
</blockquote>
|
|
|
|
|
<p>大家可以去掉 <code>pin!</code> 的调用,然后观察下报错,若以后你遇到这种错误,可以尝试使用下 <code>pin!</code>。</p>
|
|
|
|
|
<p>此时,可以运行下我们的客户端代码看看效果(别忘了先启动前面提到的 mini-redis 服务端):</p>
|
|
|
|
|
<pre><code class="language-console">got = Ok(Message { channel: "numbers", content: b"1" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"two" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"3" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"four" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"five" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"6" })
|
|
|
|
|
</code></pre>
|
|
|
|
|
<p>在了解了 <code>stream</code> 的基本用法后,我们再来看看如何使用适配器来扩展它。</p>
|
|
|
|
|
<h2 id="适配器"><a class="header" href="#适配器">适配器</a></h2>
|
|
|
|
|
<p>在前面章节中,我们了解了迭代器有<a href="https://course.rs/advance/functional-programing/iterator.html#%E6%B6%88%E8%B4%B9%E8%80%85%E4%B8%8E%E9%80%82%E9%85%8D%E5%99%A8">两种适配器</a>:</p>
|
|
|
|
|
<ul>
|
|
|
|
|
<li>迭代器适配器,会将一个迭代器转变成另一个迭代器,例如 <code>map</code>,<code>filter</code> 等</li>
|
|
|
|
|
<li>消费者适配器,会消费掉一个迭代器,最终生成一个值,例如 <code>collect</code> 可以将迭代器收集成一个集合</li>
|
|
|
|
|
</ul>
|
|
|
|
|
<p>与迭代器类似,<code>stream</code> 也有适配器,例如一个 <code>stream</code> 适配器可以将一个 <code>stream</code> 转变成另一个 <code>stream</code> ,例如 <code>map</code>、<code>take</code> 和 <code>filter</code>。</p>
|
|
|
|
|
<p>在之前的客户端中,<code>subscribe</code> 订阅一直持续下去,直到程序被关闭。现在,让我们来升级下,让它在收到三条消息后就停止迭代,最终结束。</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021"><span class="boring">#![allow(unused)]
|
|
|
|
|
</span><span class="boring">fn main() {
|
|
|
|
|
</span>let messages = subscriber
|
|
|
|
|
.into_stream()
|
|
|
|
|
.take(3);
|
|
|
|
|
<span class="boring">}</span></code></pre></pre>
|
|
|
|
|
<p>这里关键就在于 <code>take</code> 适配器,它会限制 <code>stream</code> 只能生成最多 <code>n</code> 条消息。运行下看看结果:</p>
|
|
|
|
|
<pre><code class="language-console">got = Ok(Message { channel: "numbers", content: b"1" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"two" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"3" })
|
|
|
|
|
</code></pre>
|
|
|
|
|
<p>程序终于可以正常结束了。现在,让我们过滤 <code>stream</code> 中的消息,只保留数字类型的值:</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021"><span class="boring">#![allow(unused)]
|
|
|
|
|
</span><span class="boring">fn main() {
|
|
|
|
|
</span>let messages = subscriber
|
|
|
|
|
.into_stream()
|
|
|
|
|
.filter(|msg| match msg {
|
|
|
|
|
Ok(msg) if msg.content.len() == 1 => true,
|
|
|
|
|
_ => false,
|
|
|
|
|
})
|
|
|
|
|
.take(3);
|
|
|
|
|
<span class="boring">}</span></code></pre></pre>
|
|
|
|
|
<p>运行后输出:</p>
|
|
|
|
|
<pre><code class="language-console">got = Ok(Message { channel: "numbers", content: b"1" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"3" })
|
|
|
|
|
got = Ok(Message { channel: "numbers", content: b"6" })
|
|
|
|
|
</code></pre>
|
|
|
|
|
<p>需要注意的是,适配器的顺序非常重要,<code>.filter(...).take(3)</code> 和 <code>.take(3).filter(...)</code> 的结果可能大相径庭,大家可以自己尝试下。</p>
|
|
|
|
|
<p>现在,还有一件事要做,咱们的消息被不太好看的 <code>Ok(...)</code> 所包裹,现在通过 <code>map</code> 适配器来简化下:</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021"><span class="boring">#![allow(unused)]
|
|
|
|
|
</span><span class="boring">fn main() {
|
|
|
|
|
</span>let messages = subscriber
|
|
|
|
|
.into_stream()
|
|
|
|
|
.filter(|msg| match msg {
|
|
|
|
|
Ok(msg) if msg.content.len() == 1 => true,
|
|
|
|
|
_ => false,
|
|
|
|
|
})
|
|
|
|
|
.map(|msg| msg.unwrap().content)
|
|
|
|
|
.take(3);
|
|
|
|
|
<span class="boring">}</span></code></pre></pre>
|
|
|
|
|
<p>注意到 <code>msg.unwrap</code> 了吗?大家可能会以为我们是出于示例的目的才这么用,实际上并不是,由于 <code>filter</code> 的先执行, <code>map</code> 中的 <code>msg</code> 只能是 <code>Ok(...)</code>,因此 <code>unwrap</code> 非常安全。</p>
|
|
|
|
|
<pre><code class="language-console">got = b"1"
|
|
|
|
|
got = b"3"
|
|
|
|
|
got = b"6"
|
|
|
|
|
</code></pre>
|
|
|
|
|
<p>还有一点可以改进的地方:当 <code>filter</code> 和 <code>map</code> 一起使用时,你往往可以用一个统一的方法来实现 <a href="https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html#method.filter_map"><code>filter_map</code></a>。</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021"><span class="boring">#![allow(unused)]
|
|
|
|
|
</span><span class="boring">fn main() {
|
|
|
|
|
</span>let messages = subscriber
|
|
|
|
|
.into_stream()
|
|
|
|
|
.filter_map(|msg| match msg {
|
|
|
|
|
Ok(msg) if msg.content.len() == 1 => Some(msg.content),
|
|
|
|
|
_ => None,
|
|
|
|
|
})
|
|
|
|
|
.take(3);
|
|
|
|
|
<span class="boring">}</span></code></pre></pre>
|
|
|
|
|
<p>想要学习更多的适配器,可以看看 <a href="https://docs.rs/tokio-stream/0.1.8/tokio_stream/trait.StreamExt.html"><code>StreamExt</code></a> 特征。</p>
|
|
|
|
|
<h2 id="实现-stream-特征"><a class="header" href="#实现-stream-特征">实现 Stream 特征</a></h2>
|
|
|
|
|
<p>如果大家还没忘记 <code>Future</code> 特征,那 <code>Stream</code> 特征相信你也会很快记住,因为它们非常类似:</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021"><span class="boring">#![allow(unused)]
|
|
|
|
|
</span><span class="boring">fn main() {
|
|
|
|
|
</span>use std::pin::Pin;
|
|
|
|
|
use std::task::{Context, Poll};
|
|
|
|
|
|
|
|
|
|
pub trait Stream {
|
|
|
|
|
type Item;
|
|
|
|
|
|
|
|
|
|
fn poll_next(
|
|
|
|
|
self: Pin<&mut Self>,
|
|
|
|
|
cx: &mut Context<'_>
|
|
|
|
|
) -> Poll<Option<Self::Item>>;
|
|
|
|
|
|
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
|
|
|
(0, None)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
<span class="boring">}</span></code></pre></pre>
|
|
|
|
|
<p><code>Stream::poll_next()</code> 函数跟 <code>Future::poll</code> 很相似,区别就是前者为了从 <code>stream</code> 收到多个值需要重复的进行调用。 就像在 <a href="https://course.rs/tokio/async.html"><code>深入async</code></a> 章节提到的那样,当一个 <code>stream</code> 没有做好返回一个值的准备时,它将返回一个 <code>Poll::Pending</code> ,同时将任务的 <code>waker</code> 进行注册。一旦 <code>stream</code> 准备好后, <code>waker</code> 将被调用。</p>
|
|
|
|
|
<p>通常来说,如果想要手动实现一个 <code>Stream</code>,需要组合 <code>Future</code> 和其它 <code>Stream</code>。下面,还记得在<a href="https://course.rs/tokio/async.html"><code>深入async</code></a> 中构建的 <code>Delay Future</code> 吗?现在让我们来更进一步,将它转换成一个 <code>stream</code>,每 10 毫秒生成一个值,总共生成 3 次:</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021"><span class="boring">#![allow(unused)]
|
|
|
|
|
</span><span class="boring">fn main() {
|
|
|
|
|
</span>use tokio_stream::Stream;
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::task::{Context, Poll};
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
|
|
struct Interval {
|
|
|
|
|
rem: usize,
|
|
|
|
|
delay: Delay,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Stream for Interval {
|
|
|
|
|
type Item = ();
|
|
|
|
|
|
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
|
|
|
|
|
-> Poll<Option<()>>
|
|
|
|
|
{
|
|
|
|
|
if self.rem == 0 {
|
|
|
|
|
// 去除计时器实现
|
|
|
|
|
return Poll::Ready(None);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match Pin::new(&mut self.delay).poll(cx) {
|
|
|
|
|
Poll::Ready(_) => {
|
|
|
|
|
let when = self.delay.when + Duration::from_millis(10);
|
|
|
|
|
self.delay = Delay { when };
|
|
|
|
|
self.rem -= 1;
|
|
|
|
|
Poll::Ready(Some(()))
|
|
|
|
|
}
|
|
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
<span class="boring">}</span></code></pre></pre>
|
|
|
|
|
<h4 id="async-stream"><a class="header" href="#async-stream">async-stream</a></h4>
|
|
|
|
|
<p>手动实现 <code>Stream</code> 特征实际上是相当麻烦的事,不幸地是,Rust 语言的 <code>async/await</code> 语法目前还不能用于定义 <code>stream</code>,虽然相关的工作已经在进行中。</p>
|
|
|
|
|
<p>作为替代方案,<a href="https://docs.rs/async-stream/latest/async_stream/"><code>async-stream</code></a> 包提供了一个 <code>stream!</code> 宏,它可以将一个输入转换成 <code>stream</code>,使用这个包,上面的代码可以这样实现:</p>
|
|
|
|
|
<pre><pre class="playground"><code class="language-rust edition2021"><span class="boring">#![allow(unused)]
|
|
|
|
|
</span><span class="boring">fn main() {
|
|
|
|
|
</span>use async_stream::stream;
|
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
|
|
|
|
stream! {
|
|
|
|
|
let mut when = Instant::now();
|
|
|
|
|
for _ in 0..3 {
|
|
|
|
|
let delay = Delay { when };
|
|
|
|
|
delay.await;
|
|
|
|
|
yield ();
|
|
|
|
|
when += Duration::from_millis(10);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
<span class="boring">}</span></code></pre></pre>
|
|
|
|
|
<p>嗯,看上去还是相当不错的,代码可读性大幅提升!</p>
|
|
|
|
|
<!-- todo generators -->
|
|
|
|
|
<p>是不是发现了一个关键字 <code>yield</code> ,他是用来配合生成器使用的。详见<a href="https://doc.rust-lang.org/beta/unstable-book/language-features/generators.html">原文</a></p>
|
|
|
|
|
|
|
|
|
|
<div id="giscus-container"></div>
|
|
|
|
|
</main>
|
|
|
|
|
|
|
|
|
|
<nav class="nav-wrapper" aria-label="Page navigation">
|
|
|
|
|
<!-- Mobile navigation buttons -->
|
|
|
|
|
<a rel="prev" href="../advance-practice/select.html" class="mobile-nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
|
|
|
|
|
<i class="fa fa-angle-left"></i>
|
|
|
|
|
</a>
|
|
|
|
|
|
|
|
|
|
<a rel="next prefetch" href="../advance-practice/graceful-shutdown.html" class="mobile-nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
|
|
|
|
|
<i class="fa fa-angle-right"></i>
|
|
|
|
|
</a>
|
|
|
|
|
|
|
|
|
|
<div style="clear: both"></div>
|
|
|
|
|
</nav>
|
|
|
|
|
</div>
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
<nav class="nav-wide-wrapper" aria-label="Page navigation">
|
|
|
|
|
<a rel="prev" href="../advance-practice/select.html" class="nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
|
|
|
|
|
<i class="fa fa-angle-left"></i>
|
|
|
|
|
</a>
|
|
|
|
|
|
|
|
|
|
<a rel="next prefetch" href="../advance-practice/graceful-shutdown.html" class="nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
|
|
|
|
|
<i class="fa fa-angle-right"></i>
|
|
|
|
|
</a>
|
|
|
|
|
</nav>
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<script>
|
|
|
|
|
window.playground_copyable = true;
|
|
|
|
|
</script>
|
|
|
|
|
|
|
|
|
|
<script src="../ace.js"></script>
|
|
|
|
|
<script src="../editor.js"></script>
|
|
|
|
|
<script src="../mode-rust.js"></script>
|
|
|
|
|
<script src="../theme-dawn.js"></script>
|
|
|
|
|
<script src="../theme-tomorrow_night.js"></script>
|
|
|
|
|
|
|
|
|
|
<script src="../elasticlunr.min.js"></script>
|
|
|
|
|
<script src="../mark.min.js"></script>
|
|
|
|
|
<script src="../searcher.js"></script>
|
|
|
|
|
|
|
|
|
|
<script src="../clipboard.min.js"></script>
|
|
|
|
|
<script src="../highlight.js"></script>
|
|
|
|
|
<script src="../book.js"></script>
|
|
|
|
|
|
|
|
|
|
<script type="text/javascript" charset="utf-8">
|
|
|
|
|
var pagePath = "advance-practice/stream.md"
|
|
|
|
|
</script>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<!-- Custom JS scripts -->
|
|
|
|
|
<script src="../assets/custom.js"></script>
|
|
|
|
|
<script src="../assets/bigPicture.js"></script>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
</div>
|
|
|
|
|
</body>
|
|
|
|
|
</html>
|