You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
trpl-zh-cn/ch21-02-multithreaded.html

1140 lines
72 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

<!DOCTYPE HTML>
<html lang="en" class="light sidebar-visible" dir="ltr">
<head>
<!-- Book generated using mdBook -->
<meta charset="UTF-8">
<title>将单线程 server 变为多线程 server - Rust 程序设计语言 简体中文版</title>
<!-- Custom HTML head -->
<meta name="description" content="">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="theme-color" content="#ffffff">
<link rel="icon" href="favicon.svg">
<link rel="shortcut icon" href="favicon.png">
<link rel="stylesheet" href="css/variables.css">
<link rel="stylesheet" href="css/general.css">
<link rel="stylesheet" href="css/chrome.css">
<link rel="stylesheet" href="css/print.css" media="print">
<!-- Fonts -->
<link rel="stylesheet" href="FontAwesome/css/font-awesome.css">
<link rel="stylesheet" href="fonts/fonts.css">
<!-- Highlight.js Stylesheets -->
<link rel="stylesheet" id="highlight-css" href="highlight.css">
<link rel="stylesheet" id="tomorrow-night-css" href="tomorrow-night.css">
<link rel="stylesheet" id="ayu-highlight-css" href="ayu-highlight.css">
<!-- Custom theme stylesheets -->
<link rel="stylesheet" href="ferris.css">
<link rel="stylesheet" href="theme/2018-edition.css">
<link rel="stylesheet" href="theme/semantic-notes.css">
<link rel="stylesheet" href="theme/listing.css">
<!-- Provide site root and default themes to javascript -->
<script>
const path_to_root = "";
const default_light_theme = "light";
const default_dark_theme = "navy";
</script>
<!-- Start loading toc.js asap -->
<script src="toc.js"></script>
</head>
<body>
<div id="body-container">
<!-- Work around some values being stored in localStorage wrapped in quotes -->
<script>
try {
let theme = localStorage.getItem('mdbook-theme');
let sidebar = localStorage.getItem('mdbook-sidebar');
if (theme.startsWith('"') && theme.endsWith('"')) {
localStorage.setItem('mdbook-theme', theme.slice(1, theme.length - 1));
}
if (sidebar.startsWith('"') && sidebar.endsWith('"')) {
localStorage.setItem('mdbook-sidebar', sidebar.slice(1, sidebar.length - 1));
}
} catch (e) { }
</script>
<!-- Set the theme before any content is loaded, prevents flash -->
<script>
const default_theme = window.matchMedia("(prefers-color-scheme: dark)").matches ? default_dark_theme : default_light_theme;
let theme;
try { theme = localStorage.getItem('mdbook-theme'); } catch(e) { }
if (theme === null || theme === undefined) { theme = default_theme; }
const html = document.documentElement;
html.classList.remove('light')
html.classList.add(theme);
html.classList.add("js");
</script>
<input type="checkbox" id="sidebar-toggle-anchor" class="hidden">
<!-- Hide / unhide sidebar before it is displayed -->
<script>
let sidebar = null;
const sidebar_toggle = document.getElementById("sidebar-toggle-anchor");
if (document.body.clientWidth >= 1080) {
try { sidebar = localStorage.getItem('mdbook-sidebar'); } catch(e) { }
sidebar = sidebar || 'visible';
} else {
sidebar = 'hidden';
}
sidebar_toggle.checked = sidebar === 'visible';
html.classList.remove('sidebar-visible');
html.classList.add("sidebar-" + sidebar);
</script>
<nav id="sidebar" class="sidebar" aria-label="Table of contents">
<!-- populated by js -->
<mdbook-sidebar-scrollbox class="sidebar-scrollbox"></mdbook-sidebar-scrollbox>
<noscript>
<iframe class="sidebar-iframe-outer" src="toc.html"></iframe>
</noscript>
<div id="sidebar-resize-handle" class="sidebar-resize-handle">
<div class="sidebar-resize-indicator"></div>
</div>
</nav>
<div id="page-wrapper" class="page-wrapper">
<div class="page">
<div id="menu-bar-hover-placeholder"></div>
<div id="menu-bar" class="menu-bar sticky">
<div class="left-buttons">
<label id="sidebar-toggle" class="icon-button" for="sidebar-toggle-anchor" title="Toggle Table of Contents" aria-label="Toggle Table of Contents" aria-controls="sidebar">
<i class="fa fa-bars"></i>
</label>
<button id="theme-toggle" class="icon-button" type="button" title="Change theme" aria-label="Change theme" aria-haspopup="true" aria-expanded="false" aria-controls="theme-list">
<i class="fa fa-paint-brush"></i>
</button>
<ul id="theme-list" class="theme-popup" aria-label="Themes" role="menu">
<li role="none"><button role="menuitem" class="theme" id="default_theme">Auto</button></li>
<li role="none"><button role="menuitem" class="theme" id="light">Light</button></li>
<li role="none"><button role="menuitem" class="theme" id="rust">Rust</button></li>
<li role="none"><button role="menuitem" class="theme" id="coal">Coal</button></li>
<li role="none"><button role="menuitem" class="theme" id="navy">Navy</button></li>
<li role="none"><button role="menuitem" class="theme" id="ayu">Ayu</button></li>
</ul>
<button id="search-toggle" class="icon-button" type="button" title="Search. (Shortkey: s)" aria-label="Toggle Searchbar" aria-expanded="false" aria-keyshortcuts="S" aria-controls="searchbar">
<i class="fa fa-search"></i>
</button>
</div>
<h1 class="menu-title">Rust 程序设计语言 简体中文版</h1>
<div class="right-buttons">
<a href="print.html" title="Print this book" aria-label="Print this book">
<i id="print-button" class="fa fa-print"></i>
</a>
<a href="https://github.com/KaiserY/trpl-zh-cn/tree/main" title="Git repository" aria-label="Git repository">
<i id="git-repository-button" class="fa fa-github"></i>
</a>
<a href="https://github.com/KaiserY/trpl-zh-cn/edit/main/src/ch21-02-multithreaded.md" title="Suggest an edit" aria-label="Suggest an edit">
<i id="git-edit-button" class="fa fa-edit"></i>
</a>
</div>
</div>
<div id="search-wrapper" class="hidden">
<form id="searchbar-outer" class="searchbar-outer">
<input type="search" id="searchbar" name="searchbar" placeholder="Search this book ..." aria-controls="searchresults-outer" aria-describedby="searchresults-header">
</form>
<div id="searchresults-outer" class="searchresults-outer hidden">
<div id="searchresults-header" class="searchresults-header"></div>
<ul id="searchresults">
</ul>
</div>
</div>
<!-- Apply ARIA attributes after the sidebar and the sidebar toggle button are added to the DOM -->
<script>
document.getElementById('sidebar-toggle').setAttribute('aria-expanded', sidebar === 'visible');
document.getElementById('sidebar').setAttribute('aria-hidden', sidebar !== 'visible');
Array.from(document.querySelectorAll('#sidebar a')).forEach(function(link) {
link.setAttribute('tabIndex', sidebar === 'visible' ? 0 : -1);
});
</script>
<div id="content" class="content">
<main>
<h2 id="将单线程-server-变为多线程-server"><a class="header" href="#将单线程-server-变为多线程-server">将单线程 server 变为多线程 server</a></h2>
<!-- https://github.com/rust-lang/book/blob/main/src/ch21-02-multithreaded.md -->
<!-- commit 56ec353290429e6547109e88afea4de027b0f1a9 -->
<p>目前服务端会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果服务端正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。</p>
<h3 id="在当前服务端实现中模拟慢请求"><a class="header" href="#在当前服务端实现中模拟慢请求">在当前服务端实现中模拟慢请求</a></h3>
<p>让我们看看一个慢请求如何影响当前服务端实现中的其他请求。示例 21-10 通过模拟慢响应实现了 <em>/sleep</em> 请求处理,它会使服务端在响应之前休眠五秒。</p>
<p><span class="filename">文件名src/main.rs</span></p>
<pre><pre class="playground"><code class="language-rust no_run edition2021">use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
// --snip--
<span class="boring">
</span><span class="boring">fn main() {
</span><span class="boring"> let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
</span><span class="boring">
</span><span class="boring"> for stream in listener.incoming() {
</span><span class="boring"> let stream = stream.unwrap();
</span><span class="boring">
</span><span class="boring"> handle_connection(stream);
</span><span class="boring"> }
</span><span class="boring">}
</span>
fn handle_connection(mut stream: TcpStream) {
// --snip--
<span class="boring"> let buf_reader = BufReader::new(&amp;stream);
</span><span class="boring"> let request_line = buf_reader.lines().next().unwrap().unwrap();
</span><span class="boring">
</span> let (status_line, filename) = match &amp;request_line[..] {
"GET / HTTP/1.1" =&gt; ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" =&gt; {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ =&gt; ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
// --snip--
<span class="boring">
</span><span class="boring"> let contents = fs::read_to_string(filename).unwrap();
</span><span class="boring"> let length = contents.len();
</span><span class="boring">
</span><span class="boring"> let response =
</span><span class="boring"> format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
</span><span class="boring">
</span><span class="boring"> stream.write_all(response.as_bytes()).unwrap();
</span>}</code></pre></pre>
<p><span class="caption">示例 21-10: 通过休眠五秒来模拟慢请求</span></p>
<p><code>if</code> 切换到 <code>match</code> 后现在有三个分支了。我们需要显式地匹配一个 slice 的 <code>request_line</code> 以模式匹配字符串字面值。<code>match</code> 不会像相等方法那样自动引用和解引用。</p>
<p>第一个分支与示例 21-9 中的 <code>if</code> 代码块相同。第二个分支匹配一个 <em>/sleep</em> 请求。当接收到这个请求时server 在渲染成功 HTML 页面之前会先休眠五秒。第三个分支与示例 21-9 中的 <code>else</code> 代码块相同。</p>
<p>现在就可以真切的看出我们的服务端有多么的原始:真实的库将会以更简洁的方式处理多个请求的识别!</p>
<p>使用 <code>cargo run</code> 启动服务端,并接着打开两个浏览器窗口:一个请求 <em>http://127.0.0.1:7878/</em> 而另一个请求 <em>http://127.0.0.1:7878/sleep</em> 。如果像之前一样多次请求 <em>/</em>,会发现响应的比较快速。不过如果请求 <em>/sleep</em> 之后再请求 <em>/</em>,就会看到 <em>/</em> 会等待直到 <code>sleep</code> 休眠完五秒之后才响应。</p>
<p>有多种技术可以用来避免所有请求都排在慢请求之后,包括我们在第十七章中所使用的异步;我们将要实现的一个便是线程池。</p>
<h3 id="使用线程池改善吞吐量"><a class="header" href="#使用线程池改善吞吐量">使用线程池改善吞吐量</a></h3>
<p><strong>线程池</strong><em>thread pool</em>)是一组预先分配的等待或准备处理任务的线程。当程序收到一个新任务,线程池中的一个线程会被分配该任务,并负责处理它。其余线程在该线程处理任务的同时可以处理任何其他接收到的任务。当第一个线程处理完任务时,它会返回空闲线程池中等待处理新任务。线程池允许我们并发处理连接,提高服务端的吞吐量。</p>
<p>我们会将池中线程限制为较少的数量以防拒绝服务Denial of ServiceDoS攻击如果程序为每一个接收的请求都新建一个线程某人向服务端发起千万级的请求时会耗尽服务器的资源并导致请求处理陷入停滞。</p>
<p>不同于分配无限的线程,线程池中将有固定数量的等待线程。当新进请求时,将请求发送到线程池中做处理。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向队列获取下一个请求。通过这种设计,则可以并发处理 <em><code>N</code></em> 个请求,其中 <em><code>N</code></em> 为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前已经增加了能处理的慢请求的数量。</p>
<p>这个设计仅仅是多种改善 web 服务端吞吐量的方法之一。其他可供探索的方法有 fork/join 模型fork/join model、单线程异步 I/O 模型single-threaded async I/O model或者多线程异步 I/O 模型multi-threaded async I/O model。如果你对这个主题感兴趣则可以阅读更多关于其他解决方案的内容并尝试实现它们对于一个像 Rust 这样的底层语言,所有这些方法都是可行的。</p>
<p>在开始之前让我们讨论一下线程池应用看起来如何。当尝试设计代码时首先编写客户端接口client interface有助于指导代码设计。以期望的调用方式来构建 API 代码的结构,接着在这个结构之内实现功能,而不是先实现功能再设计公有 API。</p>
<p>类似于第十二章项目中使用的测试驱动开发。这里将要使用编译器驱动开发compiler-driven development。我们将编写调用所期望的函数的代码接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。不过在开始之前我们将探索不会作为起点使用的技术。</p>
<h4 id="为每一个请求分配线程"><a class="header" href="#为每一个请求分配线程">为每一个请求分配线程</a></h4>
<p>首先,让我们探索一下如果为每一个连接都创建一个线程的代码看起来如何。这并不是最终方案,因为正如之前讲到的它会潜在的分配无限的线程,不过这是一个可用的多线程服务端的起点。接着我们会增加线程池作为改进,这样比较两个方案将会更容易。示例 21-11 展示了 <code>main</code> 的改变,它在 <code>for</code> 循环中为每一个流分配了一个新线程进行处理:</p>
<p><span class="filename">文件名src/main.rs</span></p>
<pre><pre class="playground"><code class="language-rust no_run edition2021"><span class="boring">use std::{
</span><span class="boring"> fs,
</span><span class="boring"> io::{BufReader, prelude::*},
</span><span class="boring"> net::{TcpListener, TcpStream},
</span><span class="boring"> thread,
</span><span class="boring"> time::Duration,
</span><span class="boring">};
</span><span class="boring">
</span>fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
<span class="boring">
</span><span class="boring">fn handle_connection(mut stream: TcpStream) {
</span><span class="boring"> let buf_reader = BufReader::new(&amp;stream);
</span><span class="boring"> let request_line = buf_reader.lines().next().unwrap().unwrap();
</span><span class="boring">
</span><span class="boring"> let (status_line, filename) = match &amp;request_line[..] {
</span><span class="boring"> "GET / HTTP/1.1" =&gt; ("HTTP/1.1 200 OK", "hello.html"),
</span><span class="boring"> "GET /sleep HTTP/1.1" =&gt; {
</span><span class="boring"> thread::sleep(Duration::from_secs(5));
</span><span class="boring"> ("HTTP/1.1 200 OK", "hello.html")
</span><span class="boring"> }
</span><span class="boring"> _ =&gt; ("HTTP/1.1 404 NOT FOUND", "404.html"),
</span><span class="boring"> };
</span><span class="boring">
</span><span class="boring"> let contents = fs::read_to_string(filename).unwrap();
</span><span class="boring"> let length = contents.len();
</span><span class="boring">
</span><span class="boring"> let response =
</span><span class="boring"> format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
</span><span class="boring">
</span><span class="boring"> stream.write_all(response.as_bytes()).unwrap();
</span><span class="boring">}</span></code></pre></pre>
<p><span class="caption">示例 21-11: 为每一个流新建一个线程</span></p>
<p>正如第十六章讲到的,<code>thread::spawn</code> 会创建一个新线程并在其中运行闭包中的代码。如果运行这段代码并在在浏览器中加载 <em>/sleep</em>,接着在另两个浏览器标签页中加载 <em>/</em>,确实会发现 <em>/</em> 请求不必等待 <em>/sleep</em> 结束。不过正如之前提到的,这最终会使系统崩溃因为我们会无限制地创建新线程。</p>
<p>你可能也会回想起第十七章中正是这一类情况才是 async 和 await 真正闪光的地方!在我们用线程池构建项目时请记住并思考这与异步有什么不同或相同的地方。</p>
<h4 id="创建有限数量的线程"><a class="header" href="#创建有限数量的线程">创建有限数量的线程</a></h4>
<p>我们期望线程池以类似且熟悉的方式工作,以便从线程切换到线程池并不会对使用该 API 的代码做出大幅修改。示例 21-12 展示我们希望用来替换 <code>thread::spawn</code><code>ThreadPool</code> 结构体的假想接口:</p>
<p><span class="filename">文件名src/main.rs</span></p>
<pre><code class="language-rust ignore does_not_compile"><span class="boring">use std::{
</span><span class="boring"> fs,
</span><span class="boring"> io::{BufReader, prelude::*},
</span><span class="boring"> net::{TcpListener, TcpStream},
</span><span class="boring"> thread,
</span><span class="boring"> time::Duration,
</span><span class="boring">};
</span><span class="boring">
</span>fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
<span class="boring">
</span><span class="boring">fn handle_connection(mut stream: TcpStream) {
</span><span class="boring"> let buf_reader = BufReader::new(&amp;stream);
</span><span class="boring"> let request_line = buf_reader.lines().next().unwrap().unwrap();
</span><span class="boring">
</span><span class="boring"> let (status_line, filename) = match &amp;request_line[..] {
</span><span class="boring"> "GET / HTTP/1.1" =&gt; ("HTTP/1.1 200 OK", "hello.html"),
</span><span class="boring"> "GET /sleep HTTP/1.1" =&gt; {
</span><span class="boring"> thread::sleep(Duration::from_secs(5));
</span><span class="boring"> ("HTTP/1.1 200 OK", "hello.html")
</span><span class="boring"> }
</span><span class="boring"> _ =&gt; ("HTTP/1.1 404 NOT FOUND", "404.html"),
</span><span class="boring"> };
</span><span class="boring">
</span><span class="boring"> let contents = fs::read_to_string(filename).unwrap();
</span><span class="boring"> let length = contents.len();
</span><span class="boring">
</span><span class="boring"> let response =
</span><span class="boring"> format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
</span><span class="boring">
</span><span class="boring"> stream.write_all(response.as_bytes()).unwrap();
</span><span class="boring">}</span></code></pre>
<p><span class="caption">示例 21-12: 假想的 <code>ThreadPool</code> 接口</span></p>
<p>我们使用 <code>ThreadPool::new</code> 创建一个具有可配置线程数的新线程池,在这里是四。这样在 <code>for</code> 循环中,<code>pool.execute</code> 有着类似 <code>thread::spawn</code> 的接口,它获取一个线程池运行于每一个流的闭包。我们需要实现 <code>pool.execute</code>,使其能够接收闭包并将其传递给线程池中的线程执行。这段代码还不能编译,但我们可以尝试让编译器指导我们如何修复它。</p>
<h4 id="采用编译器驱动开发构建-threadpool"><a class="header" href="#采用编译器驱动开发构建-threadpool">采用编译器驱动开发构建 <code>ThreadPool</code></a></h4>
<p>继续并对示例 21-12 中的 <em>src/main.rs</em> 做出修改,并利用来自 <code>cargo check</code> 的编译器错误来驱动开发。下面是我们得到的第一个错误:</p>
<pre><code class="language-console">$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--&gt; src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
</code></pre>
<p>太好了!这个错误告诉我们需要一个 <code>ThreadPool</code> 类型或模块,所以我们现在就来构建一个。<code>ThreadPool</code> 的实现会与 web 服务端的特定工作相独立。所以让我们从 <code>hello</code> crate 切换到存放 <code>ThreadPool</code> 实现的新库 crate。切换为库 crate 之后,我们就可以在任何工作中使用这个单独的线程池库,而不仅仅是处理网络请求。</p>
<p>创建 <em>src/lib.rs</em> 文件,它包含了目前可用的最简单的 <code>ThreadPool</code> 定义:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust noplayground">pub struct ThreadPool;</code></pre>
<p>接着编辑 <em>main.rs</em> 文件通过在 <em>src/main.rs</em> 的开头增加如下代码将 <code>ThreadPool</code> 从库 crate 引入作用域:</p>
<p><span class="filename">文件名src/main.rs</span></p>
<pre><code class="language-rust ignore">use hello::ThreadPool;
<span class="boring">use std::{
</span><span class="boring"> fs,
</span><span class="boring"> io::{BufReader, prelude::*},
</span><span class="boring"> net::{TcpListener, TcpStream},
</span><span class="boring"> thread,
</span><span class="boring"> time::Duration,
</span><span class="boring">};
</span><span class="boring">
</span><span class="boring">fn main() {
</span><span class="boring"> let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
</span><span class="boring"> let pool = ThreadPool::new(4);
</span><span class="boring">
</span><span class="boring"> for stream in listener.incoming() {
</span><span class="boring"> let stream = stream.unwrap();
</span><span class="boring">
</span><span class="boring"> pool.execute(|| {
</span><span class="boring"> handle_connection(stream);
</span><span class="boring"> });
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">fn handle_connection(mut stream: TcpStream) {
</span><span class="boring"> let buf_reader = BufReader::new(&amp;stream);
</span><span class="boring"> let request_line = buf_reader.lines().next().unwrap().unwrap();
</span><span class="boring">
</span><span class="boring"> let (status_line, filename) = match &amp;request_line[..] {
</span><span class="boring"> "GET / HTTP/1.1" =&gt; ("HTTP/1.1 200 OK", "hello.html"),
</span><span class="boring"> "GET /sleep HTTP/1.1" =&gt; {
</span><span class="boring"> thread::sleep(Duration::from_secs(5));
</span><span class="boring"> ("HTTP/1.1 200 OK", "hello.html")
</span><span class="boring"> }
</span><span class="boring"> _ =&gt; ("HTTP/1.1 404 NOT FOUND", "404.html"),
</span><span class="boring"> };
</span><span class="boring">
</span><span class="boring"> let contents = fs::read_to_string(filename).unwrap();
</span><span class="boring"> let length = contents.len();
</span><span class="boring">
</span><span class="boring"> let response =
</span><span class="boring"> format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
</span><span class="boring">
</span><span class="boring"> stream.write_all(response.as_bytes()).unwrap();
</span><span class="boring">}</span></code></pre>
<p>这仍然不能工作,再次尝试运行来得到下一个需要解决的错误:</p>
<pre><code class="language-console">$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--&gt; src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
</code></pre>
<p>此错误表明下一步是为 <code>ThreadPool</code> 创建一个叫做 <code>new</code> 的关联函数。我们还知道 <code>new</code> 需要有一个参数可以接受 <code>4</code>,而且 <code>new</code> 应该返回 <code>ThreadPool</code> 实例。让我们实现拥有此特征的最小化 <code>new</code> 函数:</p>
<p><span class="filename">文件夹src/lib.rs</span></p>
<pre><code class="language-rust noplayground">pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -&gt; ThreadPool {
ThreadPool
}
}</code></pre>
<p>这里选择 <code>usize</code> 作为 <code>size</code> 参数的类型,因为我们知道线程数为负没有意义。我们还知道将使用 <code>4</code> 作为线程集合的元素数量,这也就是使用 <code>usize</code> 类型的原因,如第三章 <a href="ch03-02-data-types.html#%E6%95%B4%E5%9E%8B">“整型”</a> 部分所讲。</p>
<p>再次编译检查这段代码:</p>
<pre><code class="language-console">$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--&gt; src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
</code></pre>
<p>这里发生错误是因为并没有 <code>ThreadPool</code> 上的 <code>execute</code> 方法。回忆 <a href="#%E5%88%9B%E5%BB%BA%E6%9C%89%E9%99%90%E6%95%B0%E9%87%8F%E7%9A%84%E7%BA%BF%E7%A8%8B">“创建有限数量的线程”</a> 部分我们决定线程池应该有与 <code>thread::spawn</code> 类似的接口,同时我们将实现 <code>execute</code> 函数来获取传递的闭包并将其传递给池中的空闲线程执行。</p>
<p>我们会在 <code>ThreadPool</code> 上定义 <code>execute</code> 函数来获取一个闭包参数。回忆第十三章的 <a href="ch13-01-closures.html#%E5%B0%86%E6%8D%95%E8%8E%B7%E7%9A%84%E5%80%BC%E7%A7%BB%E5%87%BA%E9%97%AD%E5%8C%85%E5%92%8C-fn-trait">“将捕获的值移出闭包和 <code>Fn</code> trait”</a> 部分,闭包作为参数时可以使用三个不同的 trait<code>Fn</code><code>FnMut</code><code>FnOnce</code>。我们需要决定这里应该使用哪种闭包。最终需要实现的类似于标准库的 <code>thread::spawn</code>,所以我们可以观察 <code>thread::spawn</code> 的签名在其参数中使用了何种 bound。查看文档会发现</p>
<pre><code class="language-rust ignore">pub fn spawn&lt;F, T&gt;(f: F) -&gt; JoinHandle&lt;T&gt;
where
F: FnOnce() -&gt; T,
F: Send + 'static,
T: Send + 'static,</code></pre>
<p><code>F</code> 是这里我们关心的参数;<code>T</code> 与返回值有关所以我们并不关心。考虑到 <code>spawn</code> 使用 <code>FnOnce</code> 作为 <code>F</code> 的 trait bound这可能也是我们需要的因为最终会将传递给 <code>execute</code> 的参数传给 <code>spawn</code>。因为处理请求的线程只会执行闭包一次,这也进一步确认了 <code>FnOnce</code> 是我们需要的 trait这里符合 <code>FnOnce</code><code>Once</code> 的意思。</p>
<p><code>F</code> 还有 trait bound <code>Send</code> 和生命周期绑定 <code>'static</code>,这对我们的情况也是有意义的:需要 <code>Send</code> 来将闭包从一个线程转移到另一个线程,而 <code>'static</code> 是因为并不知道线程会执行多久。让我们编写一个使用带有这些 bound 的泛型参数 <code>F</code><code>ThreadPool</code><code>execute</code> 方法:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust noplayground"><span class="boring">pub struct ThreadPool;
</span><span class="boring">
</span>impl ThreadPool {
// --snip--
<span class="boring"> pub fn new(size: usize) -&gt; ThreadPool {
</span><span class="boring"> ThreadPool
</span><span class="boring"> }
</span><span class="boring">
</span> pub fn execute&lt;F&gt;(&amp;self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}</code></pre>
<p><code>FnOnce</code> trait 仍然需要之后的 <code>()</code>,因为这里的 <code>FnOnce</code> 代表一个没有参数也没有返回值的闭包。正如函数的定义,返回值类型可以从签名中省略,不过即便没有参数也需要括号。</p>
<p>这里再一次增加了 <code>execute</code> 方法的最小化实现:它没有做任何工作,只是尝试让代码能够编译。再次进行检查:</p>
<pre><code class="language-console">$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
</code></pre>
<p>现在就只有警告了!这意味着能够编译了!注意如果尝试 <code>cargo run</code> 运行程序并在浏览器中发起请求,仍会在浏览器中出现在本章开始时那样的错误。这个库实际上还没有调用传递给 <code>execute</code> 的闭包!</p>
<blockquote>
<p>一个你可能听说过的关于像 Haskell 和 Rust 这样有严格编译器的语言的说法是 “如果代码能够编译,它就能工作”。不过这个说法并不是普适的。我们的项目可以编译,不过它完全没有做任何工作!如果构建一个真实且功能完整的项目,则需花费大量的时间来开始编写单元测试来检查代码能否编译 <strong>并且</strong> 拥有期望的行为。</p>
</blockquote>
<p>思考一下:如果这里要执行的是一个 <code>future</code> 而不是闭包会有什么不同?</p>
<h4 id="在-new-中验证线程池的线程数量"><a class="header" href="#在-new-中验证线程池的线程数量"><code>new</code> 中验证线程池的线程数量</a></h4>
<p>这里并没有对 <code>new</code><code>execute</code> 的参数做任何操作。让我们用期望的行为来实现这些函数。以考虑 <code>new</code> 作为开始。之前选择使用无符号类型作为 <code>size</code> 参数的类型,因为线程数为负的线程池没有意义。然而,线程数为零的线程池同样没有意义,不过零是一个完全有效的 <code>usize</code> 值。让我们增加在返回 <code>ThreadPool</code> 实例之前检查 <code>size</code> 是否大于零的代码,并使用 <code>assert!</code> 宏在得到零时 panic如示例 21-13 所示:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust noplayground"><span class="boring">pub struct ThreadPool;
</span><span class="boring">
</span>impl ThreadPool {
/// 创建一个新的线程池。
///
/// size 是池中线程的数量。
///
/// # Panics
///
/// 如果 size 为 0`new` 方法会 panic。
pub fn new(size: usize) -&gt; ThreadPool {
assert!(size &gt; 0);
ThreadPool
}
// --snip--
<span class="boring"> pub fn execute&lt;F&gt;(&amp;self, f: F)
</span><span class="boring"> where
</span><span class="boring"> F: FnOnce() + Send + 'static,
</span><span class="boring"> {
</span><span class="boring"> }
</span>}</code></pre>
<p><span class="caption">示例 21-13: 实现 <code>ThreadPool::new</code><code>size</code> 为零时 panic</span></p>
<p>这里也用文档注释为 <code>ThreadPool</code> 增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分来提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 <code>cargo doc --open</code> 并点击 <code>ThreadPool</code> 结构体来查看生成的 <code>new</code> 的文档看起来如何!</p>
<p>相比像这里使用 <code>assert!</code> 宏,也可以让 <code>new</code> 像之前 I/O 项目中示例 12-9 中 <code>Config::build</code> 那样将 <code>new</code> 更改为 <code>build</code> 并返回一个 <code>Result</code>,不过在这里我们选择创建一个没有任何线程的线程池应该是不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的名为 <code>build</code> 的函数来对比一下 <code>new</code> 函数:</p>
<pre><code class="language-rust ignore">pub fn build(size: usize) -&gt; Result&lt;ThreadPool, PoolCreationError&gt; {</code></pre>
<h4 id="分配空间以存储线程"><a class="header" href="#分配空间以存储线程">分配空间以存储线程</a></h4>
<p>现在我们已经有了一种方法来确保线程池中的线程数有效,就可以实际创建这些线程并在返回结构体之前将它们存储在 <code>ThreadPool</code> 结构体中。不过如何 “存储” 一个线程?让我们再看看 <code>thread::spawn</code> 的签名:</p>
<pre><code class="language-rust ignore">pub fn spawn&lt;F, T&gt;(f: F) -&gt; JoinHandle&lt;T&gt;
where
F: FnOnce() -&gt; T,
F: Send + 'static,
T: Send + 'static,</code></pre>
<p><code>spawn</code> 返回 <code>JoinHandle&lt;T&gt;</code>,其中 <code>T</code> 是闭包返回的类型。尝试使用 <code>JoinHandle</code> 来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 <code>T</code> 将会是单元类型 <code>()</code></p>
<p>示例 21-14 中的代码可以编译,不过实际上还并没有创建任何线程。我们改变了 <code>ThreadPool</code> 的定义来存放一个 <code>thread::JoinHandle&lt;()&gt;</code> 的 vector 实例,使用 <code>size</code> 容量来初始化,并设置一个 <code>for</code> 循环来运行创建线程的代码,并返回包含这些线程的 <code>ThreadPool</code> 实例:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust ignore not_desired_behavior">use std::thread;
pub struct ThreadPool {
threads: Vec&lt;thread::JoinHandle&lt;()&gt;&gt;,
}
impl ThreadPool {
// --snip--
<span class="boring"> /// Create a new ThreadPool.
</span><span class="boring"> ///
</span><span class="boring"> /// The size is the number of threads in the pool.
</span><span class="boring"> ///
</span><span class="boring"> /// # Panics
</span><span class="boring"> ///
</span><span class="boring"> /// The `new` function will panic if the size is zero.
</span> pub fn new(size: usize) -&gt; ThreadPool {
assert!(size &gt; 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// 创建一些线程并将它们存入 vector 中。
}
ThreadPool { threads }
}
// --snip--
<span class="boring">
</span><span class="boring"> pub fn execute&lt;F&gt;(&amp;self, f: F)
</span><span class="boring"> where
</span><span class="boring"> F: FnOnce() + Send + 'static,
</span><span class="boring"> {
</span><span class="boring"> }
</span>}</code></pre>
<p><span class="caption">示例 21-14: 为 <code>ThreadPool</code> 创建一个 vector 来存放线程</span></p>
<p>这里将 <code>std::thread</code> 引入库 crate 的作用域,因为使用了 <code>thread::JoinHandle</code> 作为 <code>ThreadPool</code> 中 vector 元素的类型。</p>
<p>在得到了有效的数量之后,<code>ThreadPool</code> 新建一个存放 <code>size</code> 个元素的 vector。<code>with_capacity</code> 函数与 <code>Vec::new</code> 做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 <code>size</code> 个元素,预先进行分配比仅仅 <code>Vec::new</code> 要稍微有效率一些,因为 <code>Vec::new</code> 随着插入元素而重新改变大小。</p>
<p>如果再次运行 <code>cargo check</code>,它应该会成功。</p>
<h4 id="worker-结构体负责将代码从-threadpool-传递给线程"><a class="header" href="#worker-结构体负责将代码从-threadpool-传递给线程"><code>Worker</code> 结构体负责将代码从 <code>ThreadPool</code> 传递给线程</a></h4>
<p>示例 21-14 的 <code>for</code> 循环中留下了一个关于创建线程的注释。这里,我们来看看如何实际创建线程。标准库提供了 <code>thread::spawn</code> 作为创建线程的方法,<code>thread::spawn</code> 期望获取一些一旦创建线程就应该执行的代码。然而,我们希望开始线程并使其等待稍后传递的代码。标准库的线程实现并没有包含这么做的方法;我们必须手动实现。</p>
<p>我们将要实现的行为是创建线程并稍后发送代码,这会在 <code>ThreadPool</code> 和线程间引入一个新数据类型来管理这种新行为。这个数据结构称为 <em>Worker</em>,这是一个池实现中的常见概念。<code>Worker</code> 会获取需要运行的代码,并在该 worker 的线程中运行该代码。</p>
<p>想象一下在餐馆厨房工作的员工:员工等待来自顾客的订单,他们负责接单并完成它们。</p>
<p>不同于在线程池中储存一个 <code>JoinHandle&lt;()&gt;</code> 实例的 vector我们会储存 <code>Worker</code> 结构体的实例。每一个 <code>Worker</code> 会储存一个单独的 <code>JoinHandle&lt;()&gt;</code> 实例。接着会在 <code>Worker</code> 上实现一个方法,该方法将闭包发送到已经运行的线程中执行。我们还会赋予每个 worker 一个 <code>id</code>,这样就可以在日志和调试中区别线程池中的不同 <code>Worker</code> 的实例。</p>
<p>如下是创建 <code>ThreadPool</code> 时会发生的新过程。在通过如下方式设置完 <code>Worker</code> 之后,我们会实现向线程发送闭包的代码:</p>
<ol>
<li>定义存放 <code>id</code><code>JoinHandle&lt;()&gt;</code><code>Worker</code> 结构体。</li>
<li>修改 <code>ThreadPool</code> 存放一个 <code>Worker</code> 实例的 vector。</li>
<li>定义 <code>Worker::new</code> 函数,它获取一个 <code>id</code> 数字并返回一个带有 <code>id</code> 和用空闭包分配的线程的 <code>Worker</code> 实例。</li>
<li><code>ThreadPool::new</code> 中,使用 <code>for</code> 循环计数生成 <code>id</code>,使用这个 <code>id</code> 新建 <code>Worker</code>,并储存进 vector 中。</li>
</ol>
<p>如果你渴望挑战,在查示例 21-15 中的代码之前尝试自己实现这些修改。</p>
<p>准备好了吗?示例 21-15 就是一个做出了上述修改的例子:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust noplayground">use std::thread;
pub struct ThreadPool {
workers: Vec&lt;Worker&gt;,
}
impl ThreadPool {
// --snip--
<span class="boring"> /// Create a new ThreadPool.
</span><span class="boring"> ///
</span><span class="boring"> /// The size is the number of threads in the pool.
</span><span class="boring"> ///
</span><span class="boring"> /// # Panics
</span><span class="boring"> ///
</span><span class="boring"> /// The `new` function will panic if the size is zero.
</span> pub fn new(size: usize) -&gt; ThreadPool {
assert!(size &gt; 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
<span class="boring">
</span><span class="boring"> pub fn execute&lt;F&gt;(&amp;self, f: F)
</span><span class="boring"> where
</span><span class="boring"> F: FnOnce() + Send + 'static,
</span><span class="boring"> {
</span><span class="boring"> }
</span>}
struct Worker {
id: usize,
thread: thread::JoinHandle&lt;()&gt;,
}
impl Worker {
fn new(id: usize) -&gt; Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}</code></pre>
<p><span class="caption">示例 21-15: 修改 <code>ThreadPool</code> 存放 <code>Worker</code> 实例而不是直接存放线程</span></p>
<p>这里将 <code>ThreadPool</code> 中字段名从 <code>threads</code> 改为 <code>workers</code>,因为它现在存储 <code>Worker</code> 而不是 <code>JoinHandle&lt;()&gt;</code>。使用 <code>for</code> 循环中的计数作为 <code>Worker::new</code> 的参数,并将每一个新建的 <code>Worker</code> 存储在叫做 <code>workers</code> 的 vector 中。</p>
<p><code>Worker</code> 结构体和其 <code>new</code> 函数是私有的,因为外部代码(比如 <em>src/main.rs</em> 中的 server并不需要知道关于 <code>ThreadPool</code> 中使用 <code>Worker</code> 结构体的实现细节。<code>Worker::new</code> 函数使用 <code>id</code> 参数并存储了使用一个空闭包创建的 <code>JoinHandle&lt;()&gt;</code> 实例。</p>
<blockquote>
<p>注意:如果操作系统因为没有足够的系统资源而无法创建线程时,<code>thread::spawn</code> 会 panic。这会导致整个 server panic即使一些线程可能创建成功了。出于简单的考虑这个行为是可行的不过在一个生产级别的线程池实现中你可能会希望使用 <a href="https://doc.rust-lang.org/std/thread/struct.Builder.html"><code>std::thread::Builder</code></a> 和其 <a href="https://doc.rust-lang.org/std/thread/struct.Builder.html#method.spawn"><code>spawn</code></a> 方法来返回一个 <code>Result</code></p>
</blockquote>
<p>这段代码能够编译并用指定给 <code>ThreadPool::new</code> 的参数创建存储了一系列的 <code>Worker</code> 实例,不过 <strong>仍然</strong> 没有处理 <code>execute</code> 中得到的闭包。让我们聊聊接下来怎么做。</p>
<h4 id="使用信道向线程发送请求"><a class="header" href="#使用信道向线程发送请求">使用信道向线程发送请求</a></h4>
<p>下一个需要解决的问题是传递给 <code>thread::spawn</code> 的闭包完全没有做任何工作。目前,我们在 <code>execute</code> 方法中获得期望执行的闭包,不过在创建 <code>ThreadPool</code> 的过程中创建每一个 <code>Worker</code> 时需要向 <code>thread::spawn</code> 传递一个要运行的闭包。</p>
<p>我们希望刚创建的 <code>Worker</code> 结构体能够从 <code>ThreadPool</code> 的队列中获取需要执行的代码,并发送到线程中执行。</p>
<p>在第十六章,我们学习了 <strong>信道</strong> —— 一个沟通两个线程的简单手段 —— 对于这个例子来说则是绝佳的选择。这里信道将充当任务队列的作用,<code>execute</code> 将通过 <code>ThreadPool</code> 向其中线程正在寻找工作的 <code>Worker</code> 实例发送任务。计划如下:</p>
<ol>
<li><code>ThreadPool</code> 会创建一个信道并持有发送端。</li>
<li>每个 <code>Worker</code> 将持有接收端。</li>
<li>新建一个 <code>Job</code> 结构体来存放用于向信道中发送的闭包。</li>
<li><code>execute</code> 方法会在发送者发出期望执行的工作。</li>
<li>在线程中,<code>Worker</code> 会遍历接收者并执行任何接收到的工作。</li>
</ol>
<p>让我们以在 <code>ThreadPool::new</code> 中创建信道并让 <code>ThreadPool</code> 实例充当发送者开始,如示例 21-16 所示。<code>Job</code> 结构体目前为空,但它将作为我们通过通道发送的类型:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust noplayground">use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec&lt;Worker&gt;,
sender: mpsc::Sender&lt;Job&gt;,
}
struct Job;
impl ThreadPool {
// --snip--
<span class="boring"> /// Create a new ThreadPool.
</span><span class="boring"> ///
</span><span class="boring"> /// The size is the number of threads in the pool.
</span><span class="boring"> ///
</span><span class="boring"> /// # Panics
</span><span class="boring"> ///
</span><span class="boring"> /// The `new` function will panic if the size is zero.
</span> pub fn new(size: usize) -&gt; ThreadPool {
assert!(size &gt; 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
<span class="boring">
</span><span class="boring"> pub fn execute&lt;F&gt;(&amp;self, f: F)
</span><span class="boring"> where
</span><span class="boring"> F: FnOnce() + Send + 'static,
</span><span class="boring"> {
</span><span class="boring"> }
</span>}
<span class="boring">
</span><span class="boring">struct Worker {
</span><span class="boring"> id: usize,
</span><span class="boring"> thread: thread::JoinHandle&lt;()&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Worker {
</span><span class="boring"> fn new(id: usize) -&gt; Worker {
</span><span class="boring"> let thread = thread::spawn(|| {});
</span><span class="boring">
</span><span class="boring"> Worker { id, thread }
</span><span class="boring"> }
</span><span class="boring">}</span></code></pre>
<p><span class="caption">示例 21-16: 修改 <code>ThreadPool</code> 来储存一个传输 <code>Job</code> 实例的发送者</span></p>
<p><code>ThreadPool::new</code> 中,新建了一个信道,并接着让线程池在接收端等待。这段代码能够成功编译。</p>
<p>让我们尝试在线程池创建每个 worker 时将接收端传递给它们。须知我们希望在 worker 所分配的线程中使用接收者,所以将在闭包中引用 <code>receiver</code> 参数。示例 21-17 中展示的代码还不能编译:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust ignore does_not_compile"><span class="boring">use std::{sync::mpsc, thread};
</span><span class="boring">
</span><span class="boring">pub struct ThreadPool {
</span><span class="boring"> workers: Vec&lt;Worker&gt;,
</span><span class="boring"> sender: mpsc::Sender&lt;Job&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">struct Job;
</span><span class="boring">
</span>impl ThreadPool {
// --snip--
<span class="boring"> /// Create a new ThreadPool.
</span><span class="boring"> ///
</span><span class="boring"> /// The size is the number of threads in the pool.
</span><span class="boring"> ///
</span><span class="boring"> /// # Panics
</span><span class="boring"> ///
</span><span class="boring"> /// The `new` function will panic if the size is zero.
</span> pub fn new(size: usize) -&gt; ThreadPool {
assert!(size &gt; 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
<span class="boring">
</span><span class="boring"> pub fn execute&lt;F&gt;(&amp;self, f: F)
</span><span class="boring"> where
</span><span class="boring"> F: FnOnce() + Send + 'static,
</span><span class="boring"> {
</span><span class="boring"> }
</span>}
// --snip--
<span class="boring">
</span><span class="boring">struct Worker {
</span><span class="boring"> id: usize,
</span><span class="boring"> thread: thread::JoinHandle&lt;()&gt;,
</span><span class="boring">}
</span><span class="boring">
</span>impl Worker {
fn new(id: usize, receiver: mpsc::Receiver&lt;Job&gt;) -&gt; Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}</code></pre>
<p><span class="caption">示例 21-17: 将信道的接收端传递给 worker</span></p>
<p>这是一些简单而直观的修改:将接收端传递进了 <code>Worker::new</code>,并接着在闭包中使用它。</p>
<p>如果尝试 check 代码,会得到这个错误:</p>
<pre><code class="language-console">$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--&gt; src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver&lt;Job&gt;`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--&gt; src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver&lt;Job&gt;) -&gt; Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
</code></pre>
<p>这段代码尝试将 <code>receiver</code> 传递给多个 <code>Worker</code> 实例。这是不行的回忆第十六章Rust 所提供的信道实现是多 <strong>生产者</strong>,单 <strong>消费者</strong> 的。这意味着不能简单的克隆信道的消费端来解决问题。我们也不希望将一个消息向多个消费者发送多次;我们希望有一个消息列表和多个 worker 这样每个消息就只会处理一次。</p>
<p>另外,从信道队列中取出任务涉及到修改 <code>receiver</code>,所以这些线程需要一个能安全的共享和修改 <code>receiver</code> 的方式,否则可能导致竞争状态(参考第十六章)。</p>
<p>回忆一下第十六章讨论的线程安全智能指针,为了在多个线程间共享所有权并允许线程修改其值,需要使用 <code>Arc&lt;Mutex&lt;T&gt;&gt;</code><code>Arc</code> 使得多个 <code>Worker</code> 实例拥有接收端,而 <code>Mutex</code> 则确保一次只有一个 <code>Worker</code> 能从接收端得到任务。示例 21-18 展示了所需的修改:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust noplayground">use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --snip--
<span class="boring">pub struct ThreadPool {
</span><span class="boring"> workers: Vec&lt;Worker&gt;,
</span><span class="boring"> sender: mpsc::Sender&lt;Job&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">struct Job;
</span><span class="boring">
</span>impl ThreadPool {
// --snip--
<span class="boring"> /// Create a new ThreadPool.
</span><span class="boring"> ///
</span><span class="boring"> /// The size is the number of threads in the pool.
</span><span class="boring"> ///
</span><span class="boring"> /// # Panics
</span><span class="boring"> ///
</span><span class="boring"> /// The `new` function will panic if the size is zero.
</span> pub fn new(size: usize) -&gt; ThreadPool {
assert!(size &gt; 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&amp;receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
<span class="boring">
</span><span class="boring"> pub fn execute&lt;F&gt;(&amp;self, f: F)
</span><span class="boring"> where
</span><span class="boring"> F: FnOnce() + Send + 'static,
</span><span class="boring"> {
</span><span class="boring"> }
</span>}
// --snip--
<span class="boring">struct Worker {
</span><span class="boring"> id: usize,
</span><span class="boring"> thread: thread::JoinHandle&lt;()&gt;,
</span><span class="boring">}
</span><span class="boring">
</span>impl Worker {
fn new(id: usize, receiver: Arc&lt;Mutex&lt;mpsc::Receiver&lt;Job&gt;&gt;&gt;) -&gt; Worker {
// --snip--
<span class="boring"> let thread = thread::spawn(|| {
</span><span class="boring"> receiver;
</span><span class="boring"> });
</span><span class="boring">
</span><span class="boring"> Worker { id, thread }
</span> }
}</code></pre>
<p><span class="caption">示例 21-18: 使用 <code>Arc</code><code>Mutex</code> 在 worker 间共享接收者</span></p>
<p><code>ThreadPool::new</code> 中,将接收端放入 <code>Arc</code><code>Mutex</code> 中。对于每一个新 <code>Worker</code> <code>Arc</code> 来增加引用计数,如此这些 <code>Worker</code> 实例就可以共享接收者的所有权了。</p>
<p>通过这些修改,代码可以编译了!我们已经快完成了!</p>
<h4 id="实现-execute-方法"><a class="header" href="#实现-execute-方法">实现 <code>execute</code> 方法</a></h4>
<p>最后让我们实现 <code>ThreadPool</code> 上的 <code>execute</code> 方法。同时也要修改 <code>Job</code> 结构体:它将不再是结构体,<code>Job</code> 将是一个有着 <code>execute</code> 接收到的闭包类型的 trait 对象的类型别名。第二十章 <a href="ch20-03-advanced-types.html#%E4%BD%BF%E7%94%A8%E7%B1%BB%E5%9E%8B%E5%88%AB%E5%90%8D%E5%88%9B%E5%BB%BA%E7%B1%BB%E5%9E%8B%E5%90%8C%E4%B9%89%E8%AF%8D">“使用类型别名创建类型同义词”</a> 部分提到过,类型别名允许将长的类型变短。观察示例 21-19</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust noplayground"><span class="boring">use std::{
</span><span class="boring"> sync::{Arc, Mutex, mpsc},
</span><span class="boring"> thread,
</span><span class="boring">};
</span><span class="boring">
</span><span class="boring">pub struct ThreadPool {
</span><span class="boring"> workers: Vec&lt;Worker&gt;,
</span><span class="boring"> sender: mpsc::Sender&lt;Job&gt;,
</span><span class="boring">}
</span><span class="boring">
</span>// --snip--
type Job = Box&lt;dyn FnOnce() + Send + 'static&gt;;
impl ThreadPool {
// --snip--
<span class="boring"> /// Create a new ThreadPool.
</span><span class="boring"> ///
</span><span class="boring"> /// The size is the number of threads in the pool.
</span><span class="boring"> ///
</span><span class="boring"> /// # Panics
</span><span class="boring"> ///
</span><span class="boring"> /// The `new` function will panic if the size is zero.
</span><span class="boring"> pub fn new(size: usize) -&gt; ThreadPool {
</span><span class="boring"> assert!(size &gt; 0);
</span><span class="boring">
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
</span><span class="boring">
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
</span><span class="boring">
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
</span><span class="boring">
</span><span class="boring"> for id in 0..size {
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&amp;receiver)));
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> ThreadPool { workers, sender }
</span><span class="boring"> }
</span>
pub fn execute&lt;F&gt;(&amp;self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
<span class="boring">
</span><span class="boring">struct Worker {
</span><span class="boring"> id: usize,
</span><span class="boring"> thread: thread::JoinHandle&lt;()&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">impl Worker {
</span><span class="boring"> fn new(id: usize, receiver: Arc&lt;Mutex&lt;mpsc::Receiver&lt;Job&gt;&gt;&gt;) -&gt; Worker {
</span><span class="boring"> let thread = thread::spawn(|| {
</span><span class="boring"> receiver;
</span><span class="boring"> });
</span><span class="boring">
</span><span class="boring"> Worker { id, thread }
</span><span class="boring"> }
</span><span class="boring">}</span></code></pre>
<p><span class="caption">示例 21-19: 为存放每一个闭包的 <code>Box</code> 创建一个 <code>Job</code> 类型别名,接着在信道中发出任务</span></p>
<p>在使用 <code>execute</code> 得到的闭包新建 <code>Job</code> 实例之后,将这些任务从信道的发送端发出。这里调用 <code>send</code> 上的 <code>unwrap</code>,因为发送可能会失败,这可能发生于例如停止了所有线程执行的情况,这意味着接收端停止接收新消息了。不过目前我们无法停止线程执行;只要线程池存在它们就会一直执行。使用 <code>unwrap</code> 是因为我们知道失败不可能发生,不过编译器不知道这些。</p>
<p>不过到此事情还没有结束!在 <code>Worker</code> 中,传递给 <code>thread::spawn</code> 的闭包仍然还只是 <strong>引用</strong> 了信道的接收端。相反我们需要闭包一直循环,向信道的接收端请求任务,并在得到任务时执行它们。如示例 21-20 对 <code>Worker::new</code> 做出修改:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust noplayground"><span class="boring">use std::{
</span><span class="boring"> sync::{Arc, Mutex, mpsc},
</span><span class="boring"> thread,
</span><span class="boring">};
</span><span class="boring">
</span><span class="boring">pub struct ThreadPool {
</span><span class="boring"> workers: Vec&lt;Worker&gt;,
</span><span class="boring"> sender: mpsc::Sender&lt;Job&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">type Job = Box&lt;dyn FnOnce() + Send + 'static&gt;;
</span><span class="boring">
</span><span class="boring">impl ThreadPool {
</span><span class="boring"> /// Create a new ThreadPool.
</span><span class="boring"> ///
</span><span class="boring"> /// The size is the number of threads in the pool.
</span><span class="boring"> ///
</span><span class="boring"> /// # Panics
</span><span class="boring"> ///
</span><span class="boring"> /// The `new` function will panic if the size is zero.
</span><span class="boring"> pub fn new(size: usize) -&gt; ThreadPool {
</span><span class="boring"> assert!(size &gt; 0);
</span><span class="boring">
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
</span><span class="boring">
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
</span><span class="boring">
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
</span><span class="boring">
</span><span class="boring"> for id in 0..size {
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&amp;receiver)));
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> ThreadPool { workers, sender }
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> pub fn execute&lt;F&gt;(&amp;self, f: F)
</span><span class="boring"> where
</span><span class="boring"> F: FnOnce() + Send + 'static,
</span><span class="boring"> {
</span><span class="boring"> let job = Box::new(f);
</span><span class="boring">
</span><span class="boring"> self.sender.send(job).unwrap();
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">struct Worker {
</span><span class="boring"> id: usize,
</span><span class="boring"> thread: thread::JoinHandle&lt;()&gt;,
</span><span class="boring">}
</span><span class="boring">
</span>// --snip--
impl Worker {
fn new(id: usize, receiver: Arc&lt;Mutex&lt;mpsc::Receiver&lt;Job&gt;&gt;&gt;) -&gt; Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}</code></pre>
<p><span class="caption">示例 21-20: 在 worker 线程中接收并执行任务</span></p>
<p>这里,首先在 <code>receiver</code> 上调用了 <code>lock</code> 来获取互斥器,接着 <code>unwrap</code> 在出现任何错误时 panic。如果互斥器处于一种叫做 <strong>被污染</strong><em>poisoned</em>)的状态时获取锁可能会失败,这可能发生于其他线程在持有锁时 panic 了且没有释放锁。在这种情况下,调用 <code>unwrap</code> 使其 panic 是正确的行为。请随意将 <code>unwrap</code> 改为包含有意义错误信息的 <code>expect</code></p>
<p>如果锁定了互斥器,接着调用 <code>recv</code> 从信道中接收 <code>Job</code>。最后的 <code>unwrap</code> 也绕过了一些错误,这可能发生于持有信道发送端的线程停止的情况,类似于如果接收端关闭时 <code>send</code> 方法如何返回 <code>Err</code> 一样。</p>
<p>调用 <code>recv</code> 会阻塞当前线程,所以如果还没有任务,其会等待直到有可用的任务。<code>Mutex&lt;T&gt;</code> 确保一次只有一个 <code>Worker</code> 线程尝试请求任务。</p>
<p>现在线程池处于可以运行的状态了!执行 <code>cargo run</code> 并发起一些请求:</p>
<pre><code class="language-console">$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--&gt; src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec&lt;Worker&gt;,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--&gt; src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle&lt;()&gt;,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
</code></pre>
<p>成功了!现在我们有了一个可以异步执行连接的线程池!它绝不会创建超过四个线程,所以当服务端收到大量请求时系统也不会负担过重。如果请求 <em>/sleep</em>server 也能够通过另外一个线程处理其他请求。</p>
<blockquote>
<p>注意如果同时在多个浏览器窗口打开 <em>/sleep</em>,它们可能会彼此间隔地加载 5 秒,因为一些浏览器出于缓存的原因会顺序执行相同请求的多个实例。这些限制并不是由于我们的 web 服务端造成的。</p>
</blockquote>
<p>在学习了第十七章和第十八章的 <code>while let</code> 循环之后,你可能会好奇为何不能如此编写 worker 线程,如示例 21-21 所示:</p>
<p><span class="filename">文件名src/lib.rs</span></p>
<pre><code class="language-rust ignore not_desired_behavior"><span class="boring">use std::{
</span><span class="boring"> sync::{Arc, Mutex, mpsc},
</span><span class="boring"> thread,
</span><span class="boring">};
</span><span class="boring">
</span><span class="boring">pub struct ThreadPool {
</span><span class="boring"> workers: Vec&lt;Worker&gt;,
</span><span class="boring"> sender: mpsc::Sender&lt;Job&gt;,
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">type Job = Box&lt;dyn FnOnce() + Send + 'static&gt;;
</span><span class="boring">
</span><span class="boring">impl ThreadPool {
</span><span class="boring"> /// Create a new ThreadPool.
</span><span class="boring"> ///
</span><span class="boring"> /// The size is the number of threads in the pool.
</span><span class="boring"> ///
</span><span class="boring"> /// # Panics
</span><span class="boring"> ///
</span><span class="boring"> /// The `new` function will panic if the size is zero.
</span><span class="boring"> pub fn new(size: usize) -&gt; ThreadPool {
</span><span class="boring"> assert!(size &gt; 0);
</span><span class="boring">
</span><span class="boring"> let (sender, receiver) = mpsc::channel();
</span><span class="boring">
</span><span class="boring"> let receiver = Arc::new(Mutex::new(receiver));
</span><span class="boring">
</span><span class="boring"> let mut workers = Vec::with_capacity(size);
</span><span class="boring">
</span><span class="boring"> for id in 0..size {
</span><span class="boring"> workers.push(Worker::new(id, Arc::clone(&amp;receiver)));
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> ThreadPool { workers, sender }
</span><span class="boring"> }
</span><span class="boring">
</span><span class="boring"> pub fn execute&lt;F&gt;(&amp;self, f: F)
</span><span class="boring"> where
</span><span class="boring"> F: FnOnce() + Send + 'static,
</span><span class="boring"> {
</span><span class="boring"> let job = Box::new(f);
</span><span class="boring">
</span><span class="boring"> self.sender.send(job).unwrap();
</span><span class="boring"> }
</span><span class="boring">}
</span><span class="boring">
</span><span class="boring">struct Worker {
</span><span class="boring"> id: usize,
</span><span class="boring"> thread: thread::JoinHandle&lt;()&gt;,
</span><span class="boring">}
</span>// --snip--
impl Worker {
fn new(id: usize, receiver: Arc&lt;Mutex&lt;mpsc::Receiver&lt;Job&gt;&gt;&gt;) -&gt; Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}</code></pre>
<p><span class="caption">示例 21-21: 一个使用 <code>while let</code><code>Worker::new</code> 替代实现</span></p>
<p>这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。其原因有些微妙:<code>Mutex</code> 结构体没有公有 <code>unlock</code> 方法,因为锁的所有权依赖 <code>lock</code> 方法返回的 <code>LockResult&lt;MutexGuard&lt;T&gt;&gt;</code><code>MutexGuard&lt;T&gt;</code> 的生命周期。这允许借用检查器在编译时确保绝不会在没有持有锁的情况下访问由 <code>Mutex</code> 守护的资源,不过如果没有认真的思考 <code>MutexGuard&lt;T&gt;</code> 的生命周期的话,也可能会导致比预期更久的持有锁。</p>
<p>示例 21-20 中的代码使用的 <code>let job = receiver.lock().unwrap().recv().unwrap();</code> 之所以可以工作是因为对于 <code>let</code> 来说,当 <code>let</code> 语句结束时任何表达式中等号右侧使用的临时值都会立即被丢弃。然而 <code>while let</code><code>if let</code><code>match</code>)直到相关的代码块结束都不会丢弃临时值。在示例 21-21 中,<code>job()</code> 调用期间锁一直持续,这也意味着其他的 <code>Worker</code> 实例无法接收任务。</p>
</main>
<nav class="nav-wrapper" aria-label="Page navigation">
<!-- Mobile navigation buttons -->
<a rel="prev" href="ch21-01-single-threaded.html" class="mobile-nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a rel="next prefetch" href="ch21-03-graceful-shutdown-and-cleanup.html" class="mobile-nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
<div style="clear: both"></div>
</nav>
</div>
</div>
<nav class="nav-wide-wrapper" aria-label="Page navigation">
<a rel="prev" href="ch21-01-single-threaded.html" class="nav-chapters previous" title="Previous chapter" aria-label="Previous chapter" aria-keyshortcuts="Left">
<i class="fa fa-angle-left"></i>
</a>
<a rel="next prefetch" href="ch21-03-graceful-shutdown-and-cleanup.html" class="nav-chapters next" title="Next chapter" aria-label="Next chapter" aria-keyshortcuts="Right">
<i class="fa fa-angle-right"></i>
</a>
</nav>
</div>
<script>
window.playground_copyable = true;
</script>
<script src="elasticlunr.min.js"></script>
<script src="mark.min.js"></script>
<script src="searcher.js"></script>
<script src="clipboard.min.js"></script>
<script src="highlight.js"></script>
<script src="book.js"></script>
<!-- Custom JS scripts -->
<script src="ferris.js"></script>
</div>
</body>
</html>