diff --git a/src/SUMMARY.md b/src/SUMMARY.md index 2bb99c4..493cbcc 100644 --- a/src/SUMMARY.md +++ b/src/SUMMARY.md @@ -115,11 +115,8 @@ - [最后的项目: 构建多线程 web server](ch20-00-final-project-a-web-server.md) - [单线程 web server](ch20-01-single-threaded.md) - - [慢请求如何影响吞吐率](ch20-02-slow-requests.md) - - [设计线程池接口](ch20-03-designing-the-interface.md) - - [创建线程池并储存线程](ch20-04-storing-threads.md) - - [使用通道向线程发送请求](ch20-05-sending-requests-via-channels.md) - - [Graceful Shutdown 与清理](ch20-06-graceful-shutdown-and-cleanup.md) + - [将单线程 server 变为多线程 server](ch20-02-multithreaded.md) + - [优雅停机与清理](ch20-03-graceful-shutdown-and-cleanup.md) - [附录](appendix-00.md) - [A - 关键字](appendix-01-keywords.md) diff --git a/src/ch19-04-advanced-types.md b/src/ch19-04-advanced-types.md index dc16ce2..1e60d5e 100644 --- a/src/ch19-04-advanced-types.md +++ b/src/ch19-04-advanced-types.md @@ -18,7 +18,7 @@ newtype 模式可以用于一些其他我们还未讨论的功能,包括静态 新类型也可以隐藏其内部的泛型类型。例如,可以提供一个封装了 `HashMap` 的 `People` 类型,用来储存人名以及相应的 ID。使用 `People` 的代码只需与提供的公有 API 交互即可,比如向 `People` 集合增加名字字符串的方法,这样这些代码就无需知道在内部我们将一个 `i32` ID 赋予了这个名字了。newtype 模式是一种实现第十七章 “封装隐藏了实现细节” 部分所讨论的隐藏实现细节的封装的轻量级方法。 -### 类型别名用来创建同义类型 +### 类型别名用来创建类型同义词 连同 newtype 模式,Rust 还提供了声明 **类型别名**(*type alias*)的能力,使用 `type` 关键字来给予现有类型另一个名字。例如,可以像这样创建 `i32` 的别名 `Kilometers`: diff --git a/src/ch20-02-multithreaded.md b/src/ch20-02-multithreaded.md new file mode 100644 index 0000000..4598efe --- /dev/null +++ b/src/ch20-02-multithreaded.md @@ -0,0 +1,921 @@ +## 将单线程 server 变为多线程 server + +> [ch20-02-multithreaded.md](https://github.com/rust-lang/book/blob/master/second-edition/src/ch20-02-multithreaded.md) +>
+> commit 1f0136399ba2f5540ecc301fab04bd36492e5554 + + + + +目前 server 会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。 + +### 在当前 server 实现中模拟慢请求 + +让我们看看一个慢请求如何影响当前 server 实现中的其他请求。示例 20-10 通过模拟慢响应实现了 `/sleep` 请求处理,它会使 server 在响应之前休眠五秒。 + +文件名: src/main.rs + +```rust +use std::thread; +use std::time::Duration; +# use std::io::prelude::*; +# use std::net::TcpStream; +# use std::fs::File; +// --snip-- + +fn handle_connection(mut stream: TcpStream) { +# let mut buffer = [0; 512]; +# stream.read(&mut buffer).unwrap(); + // --snip-- + + let get = b"GET / HTTP/1.1\r\n"; + let sleep = b"GET /sleep HTTP/1.1\r\n"; + + let (status_line, filename) = if buffer.starts_with(get) { + ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") + } else if buffer.starts_with(sleep) { + thread::sleep(Duration::from_secs(5)); + ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") + } else { + ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") + }; + + // --snip-- +} +``` + +示例 20-10: 通过识别 `/sleep` 并休眠五秒来模拟慢请求 + +这段代码有些凌乱,不过对于模拟的目的来说已经足够!这里创建了第二个请求 `sleep`,我们会识别其数据。在 `if` 块之后增加了一个 `else if` 来检查 `/sleep` 请求,当接收到这个请求时,在渲染成功 HTML 页面之前会先休眠五秒。 + +现在就可以真切的看出我们的 server 有多么的原始;真实的库将会以更简洁的方式处理多请求识别问题。 + +使用 `cargo run` 启动 server,并接着打开两个浏览器窗口:一个请求 `http://localhost:7878/` 而另一个请求 `http://localhost:7878/sleep`。如果像之前一样多次请求 `/`,会发现响应的比较快速。不过如果请求`/sleep` 之后在请求 `/`,就会看到 `/` 会等待直到 `sleep` 休眠完五秒之后才出现。 + +这里有多种办法来改变我们的 web server 使其避免所有请求都排在慢请求之后;我们将要实现的一个便是线程池。 + +### 使用线程池改善吞吐量 + + + + +**线程池**(*thread pool*)是一组预先分配的等待或准备处理任务的线程。当程序收到一个新任务,线程池中的一个线程会被分配任务,这个线程会离开并处理任务。其余的线程则可用于处理在第一个线程处理任务的同时处理其他接收到的任务。当第一个线程处理完任务时,它会返回空闲线程池中等待处理新任务。线程池允许我们并发处理连接,增加 server 的吞吐量。 + +我们会将池中线程限制为较少的数量,以防拒绝服务(Denial of Service, DoS)攻击;如果程序为每一个接收的请求都新建一个线程,某人向 server 发起千万级的请求请求时会耗尽服务器的资源并导致所有请求的处理都被终止。 + +不同于分配无限的线程,线程池中将有固定数量的等待线程。当新进请求时,将请求发送到线程池中做处理。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向对队列索取另一个请求。通过这种设计,则可以并发处理 `N` 个请求,其中 `N` 为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前增加了能处理的慢请求的数量。 + +这个设计仅仅是多种改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和单线程异步 I/O 模型。如果你对这个主题感兴趣,则可以阅读更多关于其他解决方案的内容并尝试用 Rust 实现他们;对于一个像 Rust 这样的底层语言,所有这些方法都是可能的。 + +在开始之前,让我们讨论一下线程池应用看起来怎样。当尝试设计代码时,首先编写客户端接口确实有助于指导代码设计。以期望的调用方式来构建 API 代码的结构,接着在这个结构之内实现功能,而不是先实现功能再设计公有 API。 + +类似于第十二章项目中使用的测试驱动开发。这里将要使用编译器驱动开发(Compiler Driven Development)。我们将编写调用所期望的函数的代码,接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。 + +#### 为每一个请求分配线程的代码结构 + +首先,让我们探索一下为每一个连接都创建一个线程的代码看起来如何。这并不是最终方案,因为正如之前讲到的它会潜在的分配无限的线程,不过这是一个开始。示例 20-11 展示了 `main` 的改变,它在 `for` 循环中为每一个流分配了一个新线程进行处理: + +文件名: src/main.rs + +```rust,no_run +# use std::thread; +# use std::io::prelude::*; +# use std::net::TcpListener; +# use std::net::TcpStream; +# +fn main() { + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + + for stream in listener.incoming() { + let stream = stream.unwrap(); + + thread::spawn(|| { + handle_connection(stream); + }); + } +} +# fn handle_connection(mut stream: TcpStream) {} +``` + +示例 20-11: 为每一个流新建一个线程 + +正如第十六章讲到的,`thread::spawn` 会创建一个新线程并在其中运行闭包中的代码。如果运行这段代码并在在浏览器中加载 `/sleep`,接着在另两个浏览器标签页中加载 `/`,确实会发现 `/` 请求不必等待 `/sleep` 结束。不过正如之前提到的,这最终会使系统崩溃因为我们无限制的创建新线程。 + +#### 为有限数量的线程创建一个类似的接口 + +我们期望线程池以类似且熟悉的方式工作,以便从线程切换到线程池并不会对使用该 API 的代码做出较大的修改。示例 20-12 展示我们希望用来替换 `thread::spawn` 的 `ThreadPool` 结构体的假想接口: + +文件名: src/main.rs + +```rust,no_run +# use std::thread; +# use std::io::prelude::*; +# use std::net::TcpListener; +# use std::net::TcpStream; +# struct ThreadPool; +# impl ThreadPool { +# fn new(size: u32) -> ThreadPool { ThreadPool } +# fn execute(&self, f: F) +# where F: FnOnce() + Send + 'static {} +# } +# +fn main() { + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + let pool = ThreadPool::new(4); + + for stream in listener.incoming() { + let stream = stream.unwrap(); + + pool.execute(|| { + handle_connection(stream); + }); + } +} +# fn handle_connection(mut stream: TcpStream) {} +``` + +示例 20-12: 假想的 `ThreadPool` 接口 + +这里使用 `ThreadPool::new` 来创建一个新的线程池,它有一个可配置的线程数的参数,在这里是四。这样在 `for` 循环中,`pool.execute` 有着类似 `thread::spawn` 的接口,它获取一个线程池运行于每一个流的闭包。`pool.execute` 需要实现为获取闭包并传递给池中的线程运行。这段代码还不能编译,不过通过尝试编译器会指导我们如何修复它。 + + + + +#### 采用编译器驱动构建 `ThreadPool` 结构体 + +继续并对示例 20-12 中的 *src/main.rs* 做出修改,并利用来自 `cargo check` 的编译器错误来驱动开发。下面是我们得到的第一个错误: + +```text +$ cargo check + Compiling hello v0.1.0 (file:///projects/hello) +error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool` + --> src\main.rs:10:16 + | +10 | let pool = ThreadPool::new(4); + | ^^^^^^^^^^^^^^^ Use of undeclared type or module + `ThreadPool` + +error: aborting due to previous error +``` + +好的,这告诉我们需要一个 `ThreadPool` 类型或模块,所以我们将构建一个。`ThreadPool` 的实现会与 web server 的特定工作相独立,所以让我们从 `hello` crate 切换到存放 `ThreadPool` 实现的新库 crate。这也意味着可以在任何工作中使用这个单独的线程池库,而不仅仅是处理网络请求。 + +创建 *src/lib.rs* 文件,它包含了目前可用的最简单的 `ThreadPool` 定义: + +文件名: src/lib.rs + +```rust +pub struct ThreadPool; +``` + +接着创建一个新目录,*src/bin*,并将二进制 crate 根文件从 *src/main.rs* 移动到 *src/bin/main.rs*。这使得库 crate 成为 *hello* 目录的主要 crate;不过仍然可以使用 `cargo run` 运行 *src/bin/main.rs* 二进制文件。移动了 *main.rs* 文件之后,修改 *src/bin/main.rs* 文件开头加入如下代码来引入库 crate 并将 `ThreadPool` 引入作用域: + +文件名: src/bin/main.rs + +```rust,ignore +extern crate hello; +use hello::ThreadPool; +``` + +这仍然不能工作,再次尝试运行来得到下一个需要解决的错误: + +```text +$ cargo check + Compiling hello v0.1.0 (file:///projects/hello) +error[E0599]: no function or associated item named `new` found for type +`hello::ThreadPool` in the current scope + --> src/bin/main.rs:13:16 + | +13 | let pool = ThreadPool::new(4); + | ^^^^^^^^^^^^^^^ function or associated item not found in + `hello::ThreadPool` +``` + +好的,这告诉我们下一步是为 `ThreadPool` 创建一个叫做 `new` 的关联函数。我们还知道 `new` 需要有一个参数可以接受 `4`,而且 `new` 应该返回 `ThreadPool` 实例。让我们实现拥有此特征的最小化 `new` 函数: + +文件夹: src/lib.rs + +```rust +pub struct ThreadPool; + +impl ThreadPool { + pub fn new(size: usize) -> ThreadPool { + ThreadPool + } +} +``` + +这里选择 `usize` 作为 `size` 参数的类型,因为我们知道为负的线程数没有意义。我们还知道将使用 4 作为线程集合的元素数量,这也就是使用 `usize` 类型的原因,如第三章 “整数类型” 部分所讲。 + +再次编译检查这段代码: + +```text +$ cargo check + Compiling hello v0.1.0 (file:///projects/hello) +warning: unused variable: `size` + --> src/lib.rs:4:16 + | +4 | pub fn new(size: usize) -> ThreadPool { + | ^^^^ + | + = note: #[warn(unused_variables)] on by default + = note: to avoid this warning, consider using `_size` instead + +error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope + --> src/bin/main.rs:18:14 + | +18 | pool.execute(|| { + | ^^^^^^^ +``` + + + + +现在有了一个警告和一个错误。暂时先忽略警告,发生错误是因为并没有 `ThreadPool` 上的 `execute` 方法。回忆 “为有限数量的线程创建一个类似的接口” 部分我们决定线程池应该有与 `thread::spawn` 类似的接口,同时我们将实现 `execute` 函数来获取传递的闭包并将其传递给池中的空闲线程执行。 + +我们会在 `ThreadPool` 上定义 `execute` 函数来获取一个闭包参数。回忆第十三章的 “使用带有泛型和 `Fn` trait 的闭包” 部分,闭包作为参数时可以使用三个不同的 trait:`Fn`、`FnMut` 和 `FnOnce`。我们需要决定这里应该使用哪种闭包。最终需要实现的类似于标准库的 `thread::spawn`,所以我们可以观察 `thread::spawn` 的签名在其参数中使用了何种 bound。查看文档会发现: + +```rust,ignore +pub fn spawn(f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static +``` + +`F` 是这里我们关心的参数;`T` 与返回值有关所以我们并不关心。考虑到 `spawn` 使用 `FnOnce` 作为 `F` 的 trait bound,这可能也是我们需要的,因为最终会将传递给 `execute` 的参数传给 `spawn`。因为处理请求的线程只会执行闭包一次,这也进一步确认了 `FnOnce` 是我们需要的 trait,这里符合 `FnOnce` 中 `Once` 的意思。 + + + + +`F` 还有 trait bound `Send` 和生命周期绑定 `'static`,这对我们的情况也是有意义的:需要 `Send` 来将闭包从一个线程转移到另一个线程,而 `'static` 是因为并不知道线程会执行多久。让我们编写一个使用带有这些 bound 的泛型参数 `F` 的 `ThreadPool` 的 `execute` 方法: + +文件名: src/lib.rs + +```rust +# pub struct ThreadPool; +impl ThreadPool { + // --snip-- + + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static + { + + } +} +``` + +`FnOnce` trait 仍然需要之后的 `()`,因为这里的 `FnOnce` 代表一个没有参数也没有返回值的闭包。正如函数的定义,返回值类型可以从签名中省略,不过即便没有参数也需要括号。 + +这里再一次增加了 `execute` 方法的最小化实现,它没有做任何工作。再次进行检查: + +```text +$ cargo check + Compiling hello v0.1.0 (file:///projects/hello) +warning: unused variable: `size` + --> src/lib.rs:4:16 + | +4 | pub fn new(size: usize) -> ThreadPool { + | ^^^^ + | + = note: #[warn(unused_variables)] on by default + = note: to avoid this warning, consider using `_size` instead + +warning: unused variable: `f` + --> src/lib.rs:8:30 + | +8 | pub fn execute(&self, f: F) + | ^ + | + = note: to avoid this warning, consider using `_f` instead +``` + +现在就只有警告了!这意味着能够编译了!注意如果尝试 `cargo run` 运行程序并在浏览器中发起请求,仍会在浏览器中出现在本章开始时那样的错误。这个库实际上还没有调用传递给 `execute` 的闭包! + +> 一个你可能听说过的关于像 Haskell 和 Rust 这样有严格编译器的语言的说法是 “如果代码能够编译,它就能工作”。这是一个提醒大家的好时机,实际上这并不是普适的。我们的项目可以编译,不过它完全没有做任何工作!如果构建一个真实且功能完整的项目,则需花费大量的时间来开始编写单元测试来检查代码能否编译 **并且** 拥有期望的行为。 + +#### 在 `new` 中验证池中线程数量 + +这里仍然存在警告是因为其并没有对 `new` 和 `execute` 的参数做任何操作。让我们用期望的行为来实现这些函数。以考虑 `new` 作为开始。 + +之前选择使用无符号类型作为 `size` 参数的类型,因为线程数为负的线程池没有意义。然而,线程数为零的线程池同样没有意义,不过零是一个完全有效的 `u32` 值。让我们增加在返回 `ThreadPool` 实例之前检查 `size` 是否大于零的代码,并使用 `assert!` 宏在得到零时 panic,如示例 20-13 所示: + + +在返回 `ThreadPool` 之前检查 `size` 是否大于零,并使用 `assert!` 宏在得到零时 panic,如列表 20-13 所示: + +文件名: src/lib.rs + +```rust +# pub struct ThreadPool; +impl ThreadPool { + /// Create a new ThreadPool. + /// + /// The size is the number of threads in the pool. + /// + /// # Panics + /// + /// The `new` function will panic if the size is zero. + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + ThreadPool + } + + // --snip-- +} +``` + +示例 20-13: 实现 `ThreadPool::new` 在 `size` 为零时 panic + +趁着这个机会我们用文档注释为 `ThreadPool` 增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分来提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 `cargo doc --open` 并点击 `ThreadPool` 结构体来查看生成的 `new` 的文档看起来如何! + +相比像这里使用 `assert!` 宏,也可以让 `new` 像之前 I/O 项目中示例 12-9 中 `Config::new` 那样返回一个 `Result`,不过在这里我们选择创建一个没有任何线程的线程池应该是不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的 `new` 版本来感受一下两者的区别: + +```rust,ignore +fn new(size: usize) -> Result { +``` + +#### 分配空间以储存线程 + +现在有了一个有效的线程池线程数,就可以实际创建这些线程并在返回之前将他们储存在 `ThreadPool` 结构体中。 + +这引出了另一个问题:如何 “储存” 一个线程?让我们再看看 `thread::spawn` 的签名: + +```rust,ignore +pub fn spawn(f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static +``` + +`spawn` 返回 `JoinHandle`,其中 `T` 是闭包返回的类型。尝试使用 `JoinHandle` 来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 `T` 将会是单元类型 `()`。 + +示例 20-14 中的代码可以编译,不过实际上还并没有创建任何线程。我们改变了 `ThreadPool` 的定义来存放一个 `thread::JoinHandle<()>` 的 vector 实例,使用 `size` 容量来初始化,并设置一个 `for` 循环了来运行创建线程的代码,并返回包含这些线程的 `ThreadPool` 实例: + +文件名: src/lib.rs + +```rust,ignore +use std::thread; + +pub struct ThreadPool { + threads: Vec>, +} + +impl ThreadPool { + // --snip-- + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let mut threads = Vec::with_capacity(size); + + for _ in 0..size { + // create some threads and store them in the vector + } + + ThreadPool { + threads + } + } + + // --snip-- +} +``` + +示例 20-14: 为 `ThreadPool` 创建一个 vector 来存放线程 + +这里将 `std::thread` 引入库 crate 的作用域,因为使用了 `thread::JoinHandle` 作为 `ThreadPool` 中 vector 元素的类型。 + +在得到了有效的数量之后,`ThreadPool` 新建一个存放 `size` 个元素的 vector。本书还未使用过 `with_capacity`,它与 `Vec::new` 做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 `size` 个元素,预先进行分配比仅仅 `Vec::new` 要稍微有效率一些,因为 `Vec::new` 随着插入元素而重新改变大小。 + +如果再次运行 `cargo check`,会看到一些警告,不过应该可以编译成功。 + +#### `Worker` 结构体负责从 `ThreadPool` 中将代码传递给线程 + + + + + + +示例 20-14 的 `for` 循环中留下了一个关于创建线程的注释。如何实际创建线程呢?这是一个难题。标准库提供的创建线程的方法,`thread::spawn`,它期望获取一些一旦创建线程就应该执行的代码。然而,我们希望开始线程并使其等待稍后传递的代码。标准库的线程实现并没有包含这么做的方法;我们必须自己实现。 + + + + +我们将要实现的行为是创建线程并稍后发送代码,这会在 `ThreadPool` 和线程间引入一个新数据类型来管理这种新行为。这个数据结构称为 `Worker`:这是一个池实现中的常见概念。想象一下在餐馆厨房工作的员工:员工等待来自客户的订单,他们负责接受这些订单并完成它们。 + + + + +不同于在线程池中储存一个 `JoinHandle<()>` 实例的 vector,我们会储存 `Worker` 结构体的实例。每一个 `Worker` 会储存一个单独的 `JoinHandle<()>` 实例。接着会在 +`Worker` 上实现一个方法,它会获取需要允许代码的闭包并将其发送给已经运行的线程执行。我们还会赋予每一个 worker `id`,这样就可以在日志和调试中区别线程池中的不同 worker。 + +首先,让我们做出如此创建 `ThreadPool` 时所需的修改。在通过如下方式设置完 `Worker` 之后,我们会实现向线程发送闭包的代码: + +1. 定义 `Worker` 结构体存放 `id` 和 `JoinHandle<()>` +2. 修改 `ThreadPool` 存放一个 `Worker` 实例的 vector +3. 定义 `Worker::new` 函数,它获取一个 `id` 数字并返回一个带有 `id` 和用空闭包分配的线程的 `Worker` 实例 +4. 在 `ThreadPool::new` 中,使用 `for` 循环计数生成 `id`,使用这个 `id` 新建 `Worker`,并储存进 vector 中 + +如果你渴望挑战,在查示例 20-15 中的代码之前尝试自己实现这些修改。 + +准备好了吗?示例 20-15 就是一个做出了这些修改的例子: + +文件名: src/lib.rs + +```rust +use std::thread; + +pub struct ThreadPool { + workers: Vec, +} + +impl ThreadPool { + // --snip-- + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + workers.push(Worker::new(id)); + } + + ThreadPool { + workers + } + } + // --snip-- +} + +struct Worker { + id: usize, + thread: thread::JoinHandle<()>, +} + +impl Worker { + fn new(id: usize) -> Worker { + let thread = thread::spawn(|| {}); + + Worker { + id, + thread, + } + } +} +``` + +示例 20-15: 修改 `ThreadPool` 存放 `Worker` 实例而不是直接存放线程 + +这里将 `ThreadPool` 中字段名从 `threads` 改为 `workers`,因为它现在储存 `Worker` 而不是 `JoinHandle<()>`。使用 `for` 循环中的计数作为 `Worker::new` 的参数,并将每一个新建的 `Worker` 储存在叫做 `workers` 的 vector 中。 + +`Worker` 结构体和其 `new` 函数是私有的,因为外部代码(比如 *src/bin/main.rs* 中的 server)并不需要知道关于 `ThreadPool` 中使用 `Worker` 结构体的实现细节。`Worker::new` 函数使用 `id` 参数并储存了使用一个空闭包创建的 `JoinHandle<()>`。 + +这段代码能够编译并用指定给 `ThreadPool::new` 的参数创建储存了一系列的 `Worker` 实例,不过 **仍然** 没有处理 `execute` 中得到的闭包。让我们聊聊接下来怎么做。 + +#### 使用通道向线程发送请求 + +下一个需要解决的问题是传递给 `thread::spawn` 的闭包完全没有做任何工作。目前,我们在 `execute` 方法中获得期望执行的闭包,不过在创建 `ThreadPool` 的过程中创建每一个 `Worker` 时需要向 `thread::spawn` 传递一个闭包。 + +我们希望刚创建的 `Worker` 结构体能够从 `ThreadPool` 的队列中获取需要执行的代码,并发送到线程中执行他们。 + +在第十六章,我们学习了 **通道** —— 一个沟通两个线程的简单手段 —— 对于这个例子来说则是绝佳的。这里通道将充当任务队列的作用,`execute` 将通过 `ThreadPool` 向其中线程正在寻找工作的 `Worker` 实例发送任务。如下是这个计划: + +1. `ThreadPool` 会创建一个通道并充当发送端。 +2. 每个 `Worker` 将会充当通道的接收端。 +3. 新建一个 `Job` 结构体来存放用于向通道中发送的闭包。 +4. `execute` 方法会在通道发送端发出期望执行的任务。 +5. 在线程中,`Worker` 会遍历通道的接收端并执行任何接收到的任务。 + +让我们以在 `ThreadPool::new` 中创建通道并让 `ThreadPool` 实例充当发送端开始,如示例 20-16 所示。`Job` 是将在通道中发出的类型;目前它是一个没有任何内容的结构体: + +文件名: src/lib.rs + +```rust +# use std::thread; +// --snip-- +use std::sync::mpsc; + +pub struct ThreadPool { + workers: Vec, + sender: mpsc::Sender, +} + +struct Job; + +impl ThreadPool { + // --snip-- + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let (sender, receiver) = mpsc::channel(); + + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + workers.push(Worker::new(id)); + } + + ThreadPool { + workers, + sender, + } + } + // --snip-- +} +# +# struct Worker { +# id: usize, +# thread: thread::JoinHandle<()>, +# } +# +# impl Worker { +# fn new(id: usize) -> Worker { +# let thread = thread::spawn(|| {}); +# +# Worker { +# id, +# thread, +# } +# } +# } +``` + +示例 20-16: 修改 `ThreadPool` 来储存一个发送 `Job` 实例的通道发送端 + +在 `ThreadPool::new` 中,新建了一个通道,并接着让线程池在接收端等待。这段代码能够编译,不过仍有警告。 + +让我们尝试在线程池创建每个 worker 时将通道的接收端传递给他们。须知我们希望在 worker 所分配的线程中使用通道的接收端,所以将在闭包中引用 `receiver` 参数。示例 20-17 中展示的代码还不能编译: + +文件名: src/lib.rs + +```rust,ignore +impl ThreadPool { + // --snip-- + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let (sender, receiver) = mpsc::channel(); + + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + workers.push(Worker::new(id, receiver)); + } + + ThreadPool { + workers, + sender, + } + } + // --snip-- +} + +// --snip-- + +impl Worker { + fn new(id: usize, receiver: mpsc::Receiver) -> Worker { + let thread = thread::spawn(|| { + receiver; + }); + + Worker { + id, + thread, + } + } +} +``` + +示例 20-17: 将通道的接收端传递给 worker + +这是一些小而直观的修改:将通道的接收端传递进了 `Worker::new`,并接着在闭包中使用它。 + +如果尝试 check 代码,会得到这个错误: + +```text +$ cargo check + Compiling hello v0.1.0 (file:///projects/hello) +error[E0382]: use of moved value: `receiver` + --> src/lib.rs:27:42 + | +27 | workers.push(Worker::new(id, receiver)); + | ^^^^^^^^ value moved here in + previous iteration of loop + | + = note: move occurs because `receiver` has type + `std::sync::mpsc::Receiver`, which does not implement the `Copy` trait +``` + +这段代码尝试将 `receiver` 传递给多个 `Worker` 实例。这是不行的,回忆第十六章:Rust 所提供的通道实现是多 **生产者**,单 **消费者** 的。这意味着不能简单的克隆通道的消费端来解决问题。即便可以,那也不是我们希望使用的技术;我们希望通过在所有的 worker 中共享单一 `receiver`,在线程间分发任务。 + + + + +另外,从通道队列中取出任务涉及到修改 `receiver`,所以这些线程需要一个能安全的共享和修改 `receiver` 的方式,否则可能导致竞争状态(参考第十六章)。 + +回忆一下第十六章讨论的线程安全智能指针,为了在多个线程间共享所有权并允许线程修改其值,需要使用 `Arc>`。`Arc` 使得多个 worker 拥有接收端,而 `Mutex` 则确保一次只有一个 worker 能从接收端得到任务。示例 20-18 展示了所需的修改: + +文件名: src/lib.rs + +```rust +# use std::thread; +# use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; + +// --snip-- + +# pub struct ThreadPool { +# workers: Vec, +# sender: mpsc::Sender, +# } +# struct Job; +# +impl ThreadPool { + // --snip-- + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let (sender, receiver) = mpsc::channel(); + + let receiver = Arc::new(Mutex::new(receiver)); + + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + workers.push(Worker::new(id, Arc::clone(&receiver))); + } + + ThreadPool { + workers, + sender, + } + } + + // --snip-- +} + +# struct Worker { +# id: usize, +# thread: thread::JoinHandle<()>, +# } +# +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + // --snip-- +# let thread = thread::spawn(|| { +# receiver; +# }); +# +# Worker { +# id, +# thread, +# } + } +} +``` + +示例 20-18: 使用 `Arc` 和 `Mutex` 在 worker 间共享通道的接收端 + +在 `ThreadPool::new` 中,将通道的接收端放入一个 `Arc` 和一个 `Mutex` 中。对于每一个新 worker,克隆 `Arc` 来增加引用计数,如此这些 worker 就可以共享接收端的所有权了。 + +通过这些修改,代码可以编译了!我们做到了! + +#### 实现 `execute` 方法 + +最后让我们实现 `ThreadPool` 上的 `execute` 方法。同时也要修改 `Job` 结构体:它将不再是结构体,`Job` 将是一个有着 `execute` 接收到的闭包类型的 trait 对象的类型别名。第十九章 “类型别名用来创建类型同义词” 部分提到过,类型别名允许将长的类型变短。观察示例 20-19: + +文件名: src/lib.rs + +```rust +// --snip-- +# pub struct ThreadPool { +# workers: Vec, +# sender: mpsc::Sender, +# } +# use std::sync::mpsc; +# struct Worker {} + +type Job = Box; + +impl ThreadPool { + // --snip-- + + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static + { + let job = Box::new(f); + + self.sender.send(job).unwrap(); + } +} + +// --snip-- +``` + +示例 20-19: 为存放每一个闭包的 `Box` 创建一个 `Job` 类型别名,接着在通道中发出任务 + +在使用 `execute` 得到的闭包新建 `Job` 实例之后,将这些任务从通道的发送端发出。这里调用 `send` 上的 `unwrap`,因为发送可能会失败,这可能发生于例如停止了所有线程执行的情况,这意味着接收端停止接收新消息了。不过目前我们无法停止线程执行;只要线程池存在他们就会一直执行。使用 `unwrap` 是因为我们知道失败不可能发生,即便编译器不这么认为。 + +不过到此事情还没有结束!在 worker 中,传递给 `thread::spawn` 的闭包仍然还只是 **引用** 了通道的接收端。相反我们需要闭包一直循环,向通道的接收端请求任务,并在得到任务时执行他们。如示例 20-20 对 `Worker::new` 做出修改: + +文件名: src/lib.rs + +```rust,ignore +// --snip-- + +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || { + loop { + let job = receiver.lock().unwrap().recv().unwrap(); + + println!("Worker {} got a job; executing.", id); + + (*job)(); + } + }); + + Worker { + id, + thread, + } + } +} +``` + +示例 20-20: 在 worker 线程中接收并执行任务 + +这里,首先在 `receiver` 上调用了 `lock` 来获取互斥器,接着 `unwrap` 在出现任何错误时 panic。如果互斥器处于一种叫做 **被污染**(*poisoned*)的状态时获取锁可能会失败,这可能发生于其他线程在持有锁时 panic 了且没有释放锁。在这种情况下,调用 `unwrap` 使其 panic 是正确的行为。请随意将 `unwrap` 改为包含有意义错误信息的 `expect`。 + +如果锁定了互斥器,接着调用 `recv` 从通道中接收 `Job`。最后的 `unwrap` 也绕过了一些错误,这可能发生于持有通道发送端的线程停止的情况,类似于如果接收端关闭时 `send` 方法如何返回 `Err` 一样。 + +调用 `recv` 会 **阻塞** 当前线程,所以如果还没有任务,其会等待直到有可用的任务。`Mutex` 确保一次只有一个 `Worker` 线程尝试请求任务。 + +理论上这段代码应该能够编译。不幸的是,Rust 编译器仍不够完美,会给出如下错误: + +```text +error[E0161]: cannot move a value of type std::ops::FnOnce() + +std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be +statically determined + --> src/lib.rs:63:17 + | +63 | (*job)(); + | ^^^^^^ +``` + +这个错误非常的神秘,因为这个问题本身就很神秘。为了调用储存在 `Box` (这正是 `Job` 别名的类型)中的 `FnOnce` 闭包,该闭包需要能将自己移动 **出** `Box`,因为当调用这个闭包时,它获取 `self` 的所有权。通常来说,将值移动出 `Box` 是不被允许的,因为 Rust 不知道 `Box` 中的值将会有多大;回忆第十五章能够正常使用 `Box` 是因为我们将未知大小的值储存进 `Box` 从而得到已知大小的值。 + +第十七章曾见过,示例 17-15 中有使用了 `self: Box` 语法的方法,它允许方法获取储存在 `Box` 中的 `Self` 值的所有权。这正是我们希望做的,然而不幸的是 Rust 不允许我们这么做:Rust 当闭包被调用时行为的那部分并没有使用 `self: Box` 实现。所以这里 Rust 也不知道它可以使用 `self: Box` 来获取闭包的所有权并将闭包移动出 `Box`。 + +Rust 仍在努力改进提升编译器的过程中,不过将来示例 20-20 中的代码应该能够正常工作。有很多像你一样的人正在修复这个以及其他问题!当你结束了本书的阅读,我们希望看到你也成为他们中的一员。 + +不过目前让我们通过一个小技巧来绕过这个问题。可以显式的告诉 Rust 在这里我们可以使用 `self: Box` 来获取 `Box` 中值的所有权,而一旦获取了闭包的所有权就可以调用它了。这涉及到定义一个新 trait,它带有一个在签名中使用 `self: Box` 的方法 `call_box`,为任何实现了 `FnOnce()` 的类型定义这个 trait,修改类型别名来使用这个新 trait,并修改 `Worker` 使用 `call_box` 方法。这些修改如示例 20-21 所示: + +文件名: src/lib.rs + +```rust,ignore +trait FnBox { + fn call_box(self: Box); +} + +impl FnBox for F { + fn call_box(self: Box) { + (*self)() + } +} + +type Job = Box; + +// --snip-- + +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || { + loop { + let job = receiver.lock().unwrap().recv().unwrap(); + + println!("Worker {} got a job; executing.", id); + + job.call_box(); + } + }); + + Worker { + id, + thread, + } + } +} +``` + +示例 20-21: 新增一个 trait `FnBox` 来绕过当前 `Box` 的限制 + +首先,新建了一个叫做 `FnBox` 的 trait。这个 trait 有一个方法 `call_box`,它类似于其他 `Fn*` trait 中的 `call` 方法,除了它获取 `self: Box` 以便获取 `self` 的所有权并将值从 `Box` 中移动出来。 + +接下来,为任何实现了 `FnOnce()` trait 的类型 `F` 实现 `FnBox` trait。这实际上意味着任何 `FnOnce()` 闭包都可以使用 `call_box` 方法。`call_box` 的实现使用 `(*self)()` 将闭包移动出 `Box` 并调用此闭包。 + +现在我们需要 `Job` 类型别名是任何实现了新 trait `FnBox` 的 `Box`。这允许我们在得到 `Job` 值时使用 `Worker` 中的 `call_box`。为任何 `FnOnce()` 闭包都实现了 `FnBox` trait 意味着无需对实际在通道中发出的值做任何修改。 + +最后,对于 `Worker::new` 的线程中所运行的闭包,调用 `call_box` 而不是直接执行闭包。现在 Rust 就能够理解我们的行为是正确的了。 + +这是非常狡猾且复杂的手段。无需过分担心他们并不是非常有道理;总有一天,这一切将是毫无必要的。 + +通过这个技巧,线程池处于可以运行的状态了!执行 `cargo run` 并发起一些请求: + +```text +$ cargo run + Compiling hello v0.1.0 (file:///projects/hello) +warning: field is never used: `workers` + --> src/lib.rs:7:5 + | +7 | workers: Vec, + | ^^^^^^^^^^^^^^^^^^^^ + | + = note: #[warn(dead_code)] on by default + +warning: field is never used: `id` + --> src/lib.rs:61:5 + | +61 | id: usize, + | ^^^^^^^^^ + | + = note: #[warn(dead_code)] on by default + +warning: field is never used: `thread` + --> src/lib.rs:62:5 + | +62 | thread: thread::JoinHandle<()>, + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + | + = note: #[warn(dead_code)] on by default + + Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs + Running `target/debug/hello` + Worker 0 got a job; executing. +Worker 2 got a job; executing. +Worker 1 got a job; executing. +Worker 3 got a job; executing. +Worker 0 got a job; executing. +Worker 2 got a job; executing. +Worker 1 got a job; executing. +Worker 3 got a job; executing. +Worker 0 got a job; executing. +Worker 2 got a job; executing. +``` + +成功了!现在我们有了一个可以异步执行连接的线程池!它绝不会创建超过四个线程,所以当 server 收到大量请求时系统也不会负担过重。如果请求 `/sleep`,server 也能够通过另外一个线程处理其他请求。 + +在学习了第十八章的 `while let` 循环之后,你可能会好奇为何不能如此编写 worker 线程: + +文件名: src/lib.rs + +```rust,ignore +// --snip-- + +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || { + while let Ok(job) = receiver.lock().unwrap().recv() { + println!("Worker {} got a job; executing.", id); + + job.call_box(); + } + }); + + Worker { + id, + thread, + } + } +} +``` + +示例 20-22: 一个使用 `while let` 的 `Worker::new` 替代实现 + +这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。如此的原因有些微妙:`Mutex` 结构体没有公有 `unlock` 方法,因为锁的所有权依赖 `lock` 方法返回的 `LockResult>` 中 `MutexGuard` 的生命周期。这允许借用检查器在编译时确保绝不会在没有持有锁的情况下访问由 `Mutex` 守护的资源,不过如果没有认真的思考 `MutexGuard` 的生命周期的话,也可能会导致比预期更久的持有锁。因为 `while` 表达式中的值在整个块一直处于作用域中,`job.call_box()` 调用的过程中其仍然持有锁,这意味着其他 worker 不能接收任务。 + +相反通过使用 `loop` 并在循环块之内而不是之外获取锁和任务,`lock` 方法返回的 `MutexGuard` 在 `let job` 语句结束之后立刻就被丢弃了。这确保了 `recv` 调用过程中持有锁,而在 `job.call_box()` 调用前锁就被释放了,这就允许并发处理多个请求了。 \ No newline at end of file diff --git a/src/ch20-06-graceful-shutdown-and-cleanup.md b/src/ch20-03-graceful-shutdown-and-cleanup.md similarity index 61% rename from src/ch20-06-graceful-shutdown-and-cleanup.md rename to src/ch20-03-graceful-shutdown-and-cleanup.md index 2e5cb3c..c82f159 100644 --- a/src/ch20-06-graceful-shutdown-and-cleanup.md +++ b/src/ch20-03-graceful-shutdown-and-cleanup.md @@ -1,18 +1,20 @@ -## Graceful Shutdown 与清理 +## 优雅停机与清理 -> [ch20-06-graceful-shutdown-and-cleanup.md](https://github.com/rust-lang/book/blob/master/second-edition/src/ch20-06-graceful-shutdown-and-cleanup.md) +> [ch20-03-graceful-shutdown-and-cleanup.md](https://github.com/rust-lang/book/blob/master/second-edition/src/ch20-03-graceful-shutdown-and-cleanup.md) >
-> commit 2e269ff82193fd65df8a87c06561d74b51ac02f7 +> commit 1f0136399ba2f5540ecc301fab04bd36492e5554 -列表 20-21 中的代码如期通过使用线程池异步的响应请求。这里有一些警告说存在一些字段并没有直接被使用,这提醒了我们并没有清理任何内容。当使用 ctrl-C 终止主线程,所有其他线程也会立刻停止,即便他们正在处理一个请求。 +示例 20-21 中的代码如期通过使用线程池异步的响应请求。这里有一些警告说 `workers`、`id` 和 `thread` 字段没有直接被使用,这提醒了我们并没有清理所有的内容。当使用不那么优雅的 ctrl-C 终止主线程时,所有其他线程也会立刻停止,即便它们正处于处理请求的过程中。 -现在我们要为 `ThreadPool` 实现 `Drop` trait 对线程池中的每一个线程调用 `join`,这样这些线程将会执行完他们的请求。接着会为 `ThreadPool` 实现一个方法来告诉线程他们应该停止接收新请求并结束。为了实践这些代码,修改 server 在 graceful Shutdown 之前只接受两个请求。 +现在我们要为 `ThreadPool` 实现 `Drop` trait 对线程池中的每一个线程调用 `join`,这样这些线程将会执行完他们的请求。接着会为 `ThreadPool` 实现一个告诉线程他们应该停止接收新请求并结束的方式。为了实践这些代码,修改 server 在优雅停机(graceful shutdown)之前只接受两个请求。 -现在开始为线程池实现 `Drop`。当线程池被丢弃时,应该 join 所有线程以确保他们完成其操作。列表 20-22 展示了 `Drop` 实现的第一次尝试;这些代码还不能够编译: +### 为 `ThreadPool` 实现 `Drop` Trait + +现在开始为线程池实现 `Drop`。当线程池被丢弃时,应该 join 所有线程以确保他们完成其操作。示例 20-23 展示了 `Drop` 实现的第一次尝试;这些代码还不能够编译: 文件名: src/lib.rs -```rust +```rust,ignore impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { @@ -24,13 +26,13 @@ impl Drop for ThreadPool { } ``` -列表 20-22:当线程池离开作用域时 join 每个线程 +示例 20-23: 当线程池离开作用域时 join 每个线程 -这里遍历线程池中的每个 `workers`,这里使用了 `&mut` 因为 `self` 本身是一个可变引用而且也需要能够修改 `worker`。当特定 worker 关闭时会打印出说明信息,接着在对应 worker 上调用 `join`。如果 `join` 失败了,通过 `unwrap` 将错误变为 panic 从而无法进行 graceful Shutdown。 +这里首先遍历线程池中的每个 `workers`。这里使用了 `&mut` 因为 `self` 本身是一个可变引用而且也需要能够修改 `worker`。对于每一个线程,会打印出说明信息表明此特定 worker 正在关闭,接着在 worker 线程上调用 `join`。如果 `join` 调用失败,通过 `unwrap` 使得 panic 并进行不优雅的关闭。 如下是尝试编译代码时得到的错误: -``` +```text error[E0507]: cannot move out of borrowed content --> src/lib.rs:65:13 | @@ -38,9 +40,9 @@ error[E0507]: cannot move out of borrowed content | ^^^^^^ cannot move out of borrowed content ``` -因为我们只有每个 `worker` 的可变借用,并不能调用 `join`:`join` 获取其参数的所有权。为了解决这个问题,需要一个方法将 `thread` 移动出拥有其所有权的 `Worker` 实例以便 `join` 可以消费这个线程。列表 17-15 中我们曾见过这么做的方法:如果 `Worker` 存放的是 `Option`,就可以在 `Option` 上调用 `take` 方法将值从 `Some` 成员中移动出来而对 `None` 成员不做处理。换句话说,正在运行的 `Worker` 的 `thread` 将是 `Some` 成员值,而当需要清理 worker 时,将 `Some` 替换为 `None`,这样 worker 就没有可以运行的线程了。 +这告诉我们并不能调用 `join`,因为只有每一个 `worker` 的可变借用,而 `join` 获取其参数的所有权。为了解决这个问题,需要一个方法将 `thread` 移动出拥有其所有权的 `Worker` 实例以便 `join` 可以消费这个线程。示例 17-15 中我们曾见过这么做的方法:如果 `Worker` 存放的是 `Option`,就可以在 `Option` 上调用 `take` 方法将值从 `Some` 成员中移动出来而对 `None` 成员不做处理。换句话说,正在运行的 `Worker` 的 `thread` 将是 `Some` 成员值,而当需要清理 worker 时,将 `Some` 替换为 `None`,这样 worker 就没有可以运行的线程了。 -所以我们知道了需要更新 `Worker` 的定义为如下: +为此需要更新 `Worker` 的定义为如下: 文件名: src/lib.rs @@ -52,10 +54,10 @@ struct Worker { } ``` -现在依靠编译器来找出其他需要修改的地方。我们会得到两个错误: +现在依靠编译器来找出其他需要修改的地方。check 代码会得到两个错误: -``` -error: no method named `join` found for type +```text +error[E0599]: no method named `join` found for type `std::option::Option>` in the current scope --> src/lib.rs:65:27 | @@ -63,24 +65,27 @@ error: no method named `join` found for type | ^^^^ error[E0308]: mismatched types - --> src/lib.rs:89:21 + --> src/lib.rs:89:13 | 89 | thread, - | ^^^^^^ expected enum `std::option::Option`, found - struct `std::thread::JoinHandle` + | ^^^^^^ + | | + | expected enum `std::option::Option`, found struct + `std::thread::JoinHandle` + | help: try using a variant of the expected type: `Some(thread)` | = note: expected type `std::option::Option>` found type `std::thread::JoinHandle<_>` ``` -第二个错误指向 `Worker::new` 结尾的代码;当新建 `Worker` 时需要将 `thread` 值封装进 `Some`: +让我们修复第二个错误,它指向 `Worker::new` 结尾的代码;当新建 `Worker` 时需要将 `thread` 值封装进 `Some`。做出如下改变以修复问题: 文件名: src/lib.rs -```rust +```rust,ignore impl Worker { fn new(id: usize, receiver: Arc>>) -> Worker { - // ...snip... + // --snip-- Worker { id, @@ -90,11 +95,11 @@ impl Worker { } ``` -第一个错误有关 `Drop` 实现,而且我们提到过要调用 `Option` 上的 `take` 将 `thread` 移动出 `worker`。如下是代码: +第一个错误位于 `Drop` 实现中。之前提到过要调用 `Option` 上的 `take` 将 `thread` 移动出 `worker`。如下改变会修复问题: 文件名: src/lib.rs -```rust +```rust,ignore impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { @@ -108,11 +113,13 @@ impl Drop for ThreadPool { } ``` -如第十七章我们见过的,`Option` 上的 `take` 方法会取出 `Some` 而留下 `None`。使用 `if let` 解构 `Some` 并得到线程,接着在线程上调用 `join`。如果 worker 的线程已然是 `None`,就知道此时这个 worker 已经清理了其线程且无需做任何操作。 +如第十七章我们见过的,`Option` 上的 `take` 方法会取出 `Some` 而留下 `None`。使用 `if let` 解构 `Some` 并得到线程,接着在线程上调用 `join`。如果 worker 的线程已然是 `None`,就知道此时这个 worker 已经清理了其线程所以无需做任何操作。 + +### 向线程发送信号使其停止接收任务 有了这些修改,代码就能编译且没有任何警告。不过也有坏消息,这些代码还不能以我们期望的方式运行。问题的关键在于 `Worker` 中分配的线程所运行的闭包中的逻辑:调用 `join` 并不会关闭线程,因为他们一直 `loop` 来寻找任务。如果采用这个实现来尝试丢弃 `ThreadPool` ,则主线程会永远阻塞在等待第一个线程结束上。 -为了修复这个问题,修改线程既监听是否有 `Job` 运行也要监听应该停止监听并退出无限循环的信号。所以通道将发送这个枚举的两个成员之一而不再直接使用 `Job` 实例: +为了修复这个问题,修改线程既监听是否有 `Job` 运行也要监听一个应该停止监听并退出无限循环的信号。所以通道将发送这个枚举的两个成员之一而不是 `Job` 实例: 文件名: src/lib.rs @@ -126,27 +133,20 @@ enum Message { `Message` 枚举要么是存放了线程需要运行的 `Job` 的 `NewJob` 成员,要么是会导致线程退出循环并终止的 `Terminate` 成员。 -同时需要修改通道来使用 `Message` 类型值而不是 `Job`,如列表 20-23 所示: +同时需要修改通道来使用 `Message` 类型值而不是 `Job`,如示例 20-24 所示: 文件名: src/lib.rs -```rust +```rust,ignore pub struct ThreadPool { workers: Vec, sender: mpsc::Sender, } -// ...snip... +// --snip-- impl ThreadPool { - // ...snip... - pub fn new(size: usize) -> ThreadPool { - assert!(size > 0); - - let (sender, receiver) = mpsc::channel(); - - // ...snip... - } + // --snip-- pub fn execute(&self, f: F) where @@ -158,7 +158,7 @@ impl ThreadPool { } } -// ...snip... +// --snip-- impl Worker { fn new(id: usize, receiver: Arc>>) -> @@ -191,11 +191,11 @@ impl Worker { } ``` -列表 20-23:收发 `Message` 值并在 `Worker` 收到 `Message::Terminate` 时退出循环 +示例 20-24: 收发 `Message` 值并在 `Worker` 收到 `Message::Terminate` 时退出循环 -需要将 `ThreadPool` 定义、创建通道的 `ThreadPool::new` 和 `Worker::new` 签名中的 `Job` 改为 `Message`。`ThreadPool` 的 `execute` 方法需要发送封装进 `Message::NewJob` 成员的任务,当获取到 `NewJob` 时会处理任务而收到 `Terminate` 成员时则会退出循环。 +为了适用 `Message` 枚举需要将两个地方的 `Job` 修改为 `Message`:`ThreadPool` 的定义和 `Worker::new` 的签名。`ThreadPool` 的 `execute` 方法需要发送封装进 `Message::NewJob` 成员的任务。然后,在 `Worker::new` 中当从通道接收 `Message` 时,当获取到 `NewJob`成员会处理任务而收到 `Terminate` 成员则会退出循环。 -通过这些修改,代码再次能够编译并按照期望的行为运行。不过还是会得到一个警告,因为并没有在任何消息中使用 `Terminate` 成员。如列表 20-14 所示那样修改 `Drop` 实现: +通过这些修改,代码再次能够编译并继续按照期望的行为运行。不过还是会得到一个警告,因为并没有创建任何 `Terminate` 成员的消息。如示例 20-25 所示修改 `Drop` 实现来修复此问题: 文件名: src/lib.rs @@ -221,51 +221,44 @@ impl Drop for ThreadPool { } ``` -列表 20-24:在对每个 worker 线程调用 `join` 之前向 worker 发送 `Message::Terminate` +示例 20-25:在对每个 worker 线程调用 `join` 之前向 worker 发送 `Message::Terminate` 现在遍历了 worker 两次,一次向每个 worker 发送一个 `Terminate` 消息,一个调用每个 worker 线程上的 `join`。如果尝试在同一循环中发送消息并立即 join 线程,则无法保证当前迭代的 worker 是从通道收到终止消息的 worker。 -为了更好的理解为什么需要两个分开的循环,想象一下只有两个 worker 的场景。如果在一个循环中遍历每个 worker,在第一次迭代中 `worker` 是第一个 worker,我们向通道发出终止消息并对第一个 worker 线程调用 `join`。如果第一个 worker 当时正忙于处理请求,则第二个 worker 会从通道接收这个终止消息并结束。而我们在等待第一个 worker 结束,不过它永远也不会结束因为第二个线程取走了终止消息。现在我们就阻塞在了等待第一个 worker 结束,而无法发出第二条终止消息。死锁! +为了更好的理解为什么需要两个分开的循环,想象一下只有两个 worker 的场景。如果在一个单独的循环中遍历每个 worker,在第一次迭代中向通道发出终止消息并对第一个 worker 线程调用 `join`。我们会一直等待第一个 worker 结束,不过它永远也不会结束因为第二个线程接收了终止消息。死锁! -为了避免此情况,首先从通道中取出所有的 `Terminate` 消息,接着 join 所有的线程。因为每个 worker 一旦收到终止消息即会停止从通道接收消息,我们就可以确保如果发送同 worker 数相同的终止消息,在 join 之前每个线程都会收到一个终止消息。 +为了避免此情况,首先在一个循环中向通道发出所有的 `Terminate` 消息,接着在另一个循环中 join 所有的线程。每个 worker 一旦收到终止消息即会停止从通道接收消息,意味着可以确保如果发送同 worker 数相同的终止消息,在 join 之前每个线程都会收到一个终止消息。 -为了实践这些代码,如列表 20-25 所示修改 `main` 在 graceful Shutdown server 之前只接受两个请求: +为了实践这些代码,如示例 20-26 所示修改 `main` 在优雅停机 server 之前只接受两个请求: 文件名: src/bin/main.rs ```rust,ignore fn main() { - let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); - let mut counter = 0; - - for stream in listener.incoming() { - if counter == 2 { - println!("Shutting down."); - break; - } - - counter += 1; - + for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } + + println!("Shutting down."); } ``` -列表 20-25:在处理两个请求之后通过退出循环来停止 server +示例 20-26: 在处理两个请求之后通过退出循环来停止 server -只处理两次请求并不是生产环境的 web server 所期望的行为,不过这可以让我们看清 graceful shutdown 和清理起作用了,因为不用再通过 ctrl-C 停止 server 了。 +你不会希望真实世界的 web server 只处理两次请求就停机了,这只是为了展示优雅停机和清理处于正常工作状态。 -这里还增加了一个 `counter` 变量在每次收到 TCP 流时递增。如果计数到达 2,会停止处理请求并退出 `for` 循环。`ThreadPool` 会在 `main` 的结尾离开作用域,而且还会看到 `drop` 实现的运行。 +`take` 方法定义于 `Iterator` trait,这里限制循环最多头 2 次。`ThreadPool` 会在 `main` 的结尾离开作用域,而且还会看到 `drop` 实现的运行。 使用 `cargo run` 启动 server,并发起三个请求。第三个请求应该会失败,而终端的输出应该看起来像这样: -``` +```text $ cargo run Compiling hello v0.1.0 (file:///projects/hello) Finished dev [unoptimized + debuginfo] target(s) in 1.0 secs @@ -285,15 +278,15 @@ Shutting down worker 2 Shutting down worker 3 ``` -当然,你可能会看到不同顺序的输出。可以从信息中看到服务是如何运行的: worker 0 和 worker 3 获取了头两个请求,接着在第三个请求时,我们停止接收连接。当 `ThreadPool` 在 `main` 的结尾离开作用域时,其 `Drop` 实现开始工作,线程池通知所有线程终止。每个 worker 在收到终止消息时会打印出一个信息,接着线程池调用 `join` 来终止每一个 worker 线程。 +可能会出现不同顺序的 worker 和信息输出。可以从信息中看到服务是如何运行的: worker 0 和 worker 3 获取了头两个请求,接着在第三个请求时,我们停止接收连接。当 `ThreadPool` 在 `main` 的结尾离开作用域时,其 `Drop` 实现开始工作,线程池通知所有线程终止。每个 worker 在收到终止消息时会打印出一个信息,接着线程池调用 `join` 来终止每一个 worker 线程。 这个特定的运行过程中一个有趣的地方在于:注意我们向通道中发出终止消息,而在任何线程收到消息之前,就尝试 join worker 0 了。worker 0 还没有收到终止消息,所以主线程阻塞直到 worker 0 结束。与此同时,每一个线程都收到了终止消息。一旦 worker 0 结束,主线程就等待其他 worker 结束,此时他们都已经收到终止消息并能够停止了。 -恭喜!现在我们完成了这个项目,也有了一个使用线程池异步响应请求的基础 web server。我们能对 server 执行 graceful shutdown,它会清理线程池中的所有线程。如下是完整的代码参考: +恭喜!现在我们完成了这个项目,也有了一个使用线程池异步响应请求的基础 web server。我们能对 server 执行优雅停机,它会清理线程池中的所有线程。如下是完整的代码参考: -Filename: src/bin/main.rs +文件名: src/bin/main.rs -```rust +```rust,ignore extern crate hello; use hello::ThreadPool; @@ -305,25 +298,18 @@ use std::thread; use std::time::Duration; fn main() { - let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); + let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); - let mut counter = 0; - - for stream in listener.incoming() { - if counter == 2 { - println!("Shutting down."); - break; - } - - counter += 1; - + for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } + + println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { @@ -354,7 +340,7 @@ fn handle_connection(mut stream: TcpStream) { } ``` -Filename: src/lib.rs +文件名: src/lib.rs ```rust use std::thread; @@ -402,7 +388,7 @@ impl ThreadPool { let mut workers = Vec::with_capacity(size); for id in 0..size { - workers.push(Worker::new(id, receiver.clone())); + workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { @@ -487,4 +473,4 @@ impl Worker { ## 总结 -好极了!你结束了本书的学习!由衷感谢你与我们一道加入这次 Rust 之旅。现在你已经准备好出发并实现自己的 Rust 项目或帮助他人了。请不要忘记我们的社区,这里有其他 Rustaceans 正乐于帮助你迎接 Rust 之路上的任何挑战。 \ No newline at end of file +好极了!你结束了本书的学习!由衷感谢你与我们一道加入这次 Rust 之旅。现在你已经准备好出发并实现自己的 Rust 项目并帮助他人了。请不要忘记我们的社区,这里有其他 Rustaceans 正乐于帮助你迎接 Rust 之路上的任何挑战。 \ No newline at end of file