@ -2,7 +2,7 @@
> [ch20-02-multithreaded.md ](https://github.com/rust-lang/book/blob/main/src/ch20-02-multithreaded.md )
> < br >
> commit 95b5e7c86d33e98eec6f73b268d106621f3d24a1
> commit 98c6225e5fb8255349ec0dc235433530ed3fb534
目前 server 会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。
@ -16,15 +16,18 @@
{{#rustdoc_include ../listings/ch20-web-server/listing-20-10/src/main.rs:here}}
```
< span class = "caption" >示例 20-10: 通过识别 */sleep* 并 休眠五秒来模拟慢请求< /span >
< span class = "caption" >示例 20-10: 通过休眠五秒来模拟慢请求< /span >
这段代码有些凌乱,不过对于模拟的目的来说已经足够。这里创建了第二个请求 `sleep` ,我们会识别其数据。在 `if` 块之后增加了一个 `else if` 来检查 */sleep* 请求,当接收到这个请求时,在渲染成功 HTML 页面之前会先休眠五秒。
从 `if` 切换到 `match` 后现在有三个分支了。我们需要显式匹配一个 slice 的 `request_line` 以匹配字符串字面值的模式。`match` 不会像相等方法那样自动引用和解引用。
第一个分支与示例 20-9 中的 `if` 代码块相同。第二个分支匹配一个 */sleep* 请求。当接收到这个请求时, server 在渲染成功 HTML 页面之前会先休眠五秒。第三个分支与示例 20-9 中的 `else` 代码块相同。
现在就可以真切的看出我们的 server 有多么的原始:真实的库将会以更简洁的方式处理多请求识别问题!
使用 `cargo run` 启动 server, 并接着打开两个浏览器窗口: 一个请求 *http://127.0.0.1:7878/* 而另一个请求 *http://127.0.0.1:7878/sleep* 。如果像之前一样多次请求 */* ,会发现响应的比较快速。不过如果请求 */sleep* 之后在请求 */* ,就会看到 */* 会等待直到 `sleep` 休眠完五秒之后才出现。
这里有多种办法来改变我们的 web server 使其避免所有请求都排在慢请求之后;我们将要实现的一个便是线程池。
有多种技术可以用来避免所有请求都排在慢请求之后;我们将要实现的一个便是线程池。
### 使用线程池改善吞吐量
@ -34,15 +37,15 @@
不同于分配无限的线程,线程池中将有固定数量的等待线程。当新进请求时,将请求发送到线程池中做处理。线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向队列索取另一个请求。通过这种设计,则可以并发处理 `N` 个请求,其中 `N` 为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前增加了能处理的慢请求的数量。
这个设计仅仅是多种改善 web server 吞吐量的方法之一。其他可供探索的方法有 fork/join 模型和单线程异步 I/O 模型 。如果你对这个主题感兴趣,则可以阅读更多关于其他解决方案的内容并尝试用 Rust 实现他们;对于一个像 Rust 这样的底层语言,所有这些方法都是可能的。
这个设计仅仅是多种改善 web server 吞吐量的方法之一。其他可供探索的方法有 **fork/join 模型** ( *fork/join model*)、**单线程异步 I/O 模型**( *single-threaded async I/O model*)或者**多线程异步 I/O 模型**( *multi-threaded async I/O model*) 。如果你对这个主题感兴趣,则可以阅读更多关于其他解决方案的内容并尝试实现他们;对于一个像 Rust 这样的底层语言,所有这些方法都是可能的。
在开始之前,让我们讨论一下线程池应用看起来怎样。当尝试设计代码时,首先编写客户端接口确实有助于指导代码设计。以期望的调用方式来构建 API 代码的结构,接着在这个结构之内实现功能,而不是先实现功能再设计公有 API。
类似于第十二章项目中使用的测试驱动开发。这里将要使用编译器驱动开发( compiler-driven development) 。我们将编写调用所期望的函数的代码, 接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。
类似于第十二章项目中使用的测试驱动开发。这里将要使用编译器驱动开发( compiler-driven development) 。我们将编写调用所期望的函数的代码, 接着观察编译器错误告诉我们接下来需要修改什么使得代码可以工作。不过在开始之前,我们将探索不会作为起点的技术。
#### 为每一个请求分配线程的代码结构
#### 为每一个请求分配线程
首先,让我们探索一下为每一个连接都创建一个线程的代码看起来如何。这并不是最终方案,因为正如之前讲到的它会潜在的分配无限的线程,不过这是一个开始 。示例 20-11 展示了 `main` 的改变,它在 `for` 循环中为每一个流分配了一个新线程进行处理:
首先,让我们探索一下为每一个连接都创建一个线程的代码看起来如何。这并不是最终方案,因为正如之前讲到的它会潜在的分配无限的线程,不过这是一个可用的多线程 server 的起点。接着我们会增加线程池作为改进,这样比较两个方案将会更容易 。示例 20-11 展示了 `main` 的改变,它在 `for` 循环中为每一个流分配了一个新线程进行处理:
< span class = "filename" > 文件名: src/main.rs< / span >
@ -54,7 +57,7 @@
正如第十六章讲到的,`thread::spawn` 会创建一个新线程并在其中运行闭包中的代码。如果运行这段代码并在在浏览器中加载 */sleep* ,接着在另两个浏览器标签页中加载 */* ,确实会发现 */* 请求不必等待 */sleep* 结束。不过正如之前提到的,这最终会使系统崩溃因为我们无限制的创建新线程。
#### 为有限数量的线程创建一个类似的接口
#### 创建有限数量的线程
我们期望线程池以类似且熟悉的方式工作,以便从线程切换到线程池并不会对使用该 API 的代码做出较大的修改。示例 20-12 展示我们希望用来替换 `thread::spawn` 的 `ThreadPool` 结构体的假想接口:
@ -68,7 +71,7 @@
这里使用 `ThreadPool::new` 来创建一个新的线程池,它有一个可配置的线程数的参数,在这里是四。这样在 `for` 循环中,`pool.execute` 有着类似 `thread::spawn` 的接口,它获取一个线程池运行于每一个流的闭包。`pool.execute` 需要实现为获取闭包并传递给池中的线程运行。这段代码还不能编译,不过通过尝试,编译器会指导我们如何修复它。
#### 采用编译器驱动构建 `ThreadPool` 结构体
#### 采用编译器驱动构建 `ThreadPool`
继续并对示例 20-12 中的 *src/main.rs* 做出修改,并利用来自 `cargo check` 的编译器错误来驱动开发。下面是我们得到的第一个错误:
@ -86,12 +89,12 @@
{{#rustdoc_include ../listings/ch20-web-server/no-listing-01-define-threadpool-struct/src/lib.rs}}
```
接着创建一个新目录,*src/bin*,并将二进制 crate 根文件从 *src/main.rs* 移动到 *src/bin/main.rs* 。这使得库 crate 成为 *hello* 目录的主要 crate; 不过仍然可以使用 `cargo run` 运行 *src/bin/main.rs* 二进制文件。移动了 *main.rs* 文件之后,修改 *src/bin/main.rs* 文件开头加入如下代码来引入库 crate 并将 `ThreadPool` 引入作用域:
接着编辑 *main.rs* 文件通过在 *src/main.rs* 的开头增加如下代码将 `ThreadPool` 从库 crate 引入作用域:
< span class = "filename" > 文件名: src/bin/main.rs< / span >
```rust,ignore
{{#rustdoc_include ../listings/ch20-web-server/no-listing-01-define-threadpool-struct/src/bin/ main.rs:here}}
{{#rustdoc_include ../listings/ch20-web-server/no-listing-01-define-threadpool-struct/src/main.rs:here}}
```
这仍然不能工作,再次尝试运行来得到下一个需要解决的错误:
@ -116,9 +119,9 @@
{{#include ../listings/ch20-web-server/no-listing-02-impl-threadpool-new/output.txt}}
```
现在有了一个警告和一个错误。暂时先忽略警告,发生错误是因为并没有 `ThreadPool` 上的 `execute` 方法。回忆 [“ 为有限数量的线程创建一个类似的接口”](#creating-a-similar-interface-for -a-finite-number-of-threads) 部分我们决定线程池应该有与 `thread::spawn` 类似的接口,同时我们将实现 `execute` 函数来获取传递的闭包并将其传递给池中的空闲线程执行。
现在有了一个警告和一个错误。暂时先忽略警告,发生错误是因为并没有 `ThreadPool` 上的 `execute` 方法。回忆 [“ 创建有限数量的线程”](#creating -a-finite-number-of-threads) 部分我们决定线程池应该有与 `thread::spawn` 类似的接口,同时我们将实现 `execute` 函数来获取传递的闭包并将其传递给池中的空闲线程执行。
我们会在 `ThreadPool` 上定义 `execute` 函数来获取一个闭包参数。回忆第十三章的 [“使用带有泛型和 `Fn` trait 的闭包”][storing-closures-using-generic-parameters-and-the- fn-traits] 部分,闭包作为参数时可以使用三个不同的 trait: `Fn`、`FnMut` 和 `FnOnce` 。我们需要决定这里应该使用哪种闭包。最终需要实现的类似于标准库的 `thread::spawn` ,所以我们可以观察 `thread::spawn` 的签名在其参数中使用了何种 bound。查看文档会发现:
我们会在 `ThreadPool` 上定义 `execute` 函数来获取一个闭包参数。回忆第十三章的 [“将被捕获的值移出闭包和 `Fn` trait”][ fn-traits] 部分,闭包作为参数时可以使用三个不同的 trait: `Fn`、`FnMut` 和 `FnOnce` 。我们需要决定这里应该使用哪种闭包。最终需要实现的类似于标准库的 `thread::spawn` ,所以我们可以观察 `thread::spawn` 的签名在其参数中使用了何种 bound。查看文档会发现:
```rust,ignore
pub fn spawn< F , T > (f: F) -> JoinHandle< T >
@ -162,17 +165,17 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
< span class = "caption" > 示例 20-13: 实现 `ThreadPool::new` 在 `size` 为零时 panic</ span >
这里用文档注释为 `ThreadPool` 增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分来提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 `cargo doc --open` 并点击 `ThreadPool` 结构体来查看生成的 `new` 的文档看起来如何!
这里也 用文档注释为 `ThreadPool` 增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分来提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 `cargo doc --open` 并点击 `ThreadPool` 结构体来查看生成的 `new` 的文档看起来如何!
相比像这里使用 `assert!` 宏,也可以让 `new` 像之前 I/O 项目中示例 12-9 中 `Config:: new` 那样 返回一个 `Result` ,不过在这里我们选择创建一个没有任何线程的线程池应该是不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的 `new` 版本来感受一下两者的区别 :
相比像这里使用 `assert!` 宏,也可以让 `new` 像之前 I/O 项目中示例 12-9 中 `Config:: build` 那样将 `new` 更改为 `build` 并 返回一个 `Result` ,不过在这里我们选择创建一个没有任何线程的线程池应该是不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的名为 `build` 的函数来对比一下 `new` 函数 :
```rust,ignore
pub fn new (size: usize) -> Result< ThreadPool , PoolCreationError > {
pub fn build (size: usize) -> Result< ThreadPool , PoolCreationError > {
```
#### 分配空间以储存线程
现在有了一个有效的线程池线程数,就可以实际创建这些线程并在返回之前将他们储存在 `ThreadPool` 结构体中。不过如何 “储存” 一个线程?让我们再看看 `thread::spawn` 的签名:
现在有了一个有效的线程池线程数,就可以实际创建这些线程并在返回结构体 之前将他们储存在 `ThreadPool` 结构体中。不过如何 “储存” 一个线程?让我们再看看 `thread::spawn` 的签名:
```rust,ignore
pub fn spawn< F , T > (f: F) -> JoinHandle< T >
@ -196,19 +199,19 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
这里将 `std::thread` 引入库 crate 的作用域,因为使用了 `thread::JoinHandle` 作为 `ThreadPool` 中 vector 元素的类型。
在得到了有效的数量之后,`ThreadPool` 新建一个存放 `size` 个元素的 vector。本书还未使用过 `with_capacity` ,它 与 `Vec::new` 做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 `size` 个元素,预先进行分配比仅仅 `Vec::new` 要稍微有效率一些,因为 `Vec::new` 随着插入元素而重新改变大小。
在得到了有效的数量之后,`ThreadPool` 新建一个存放 `size` 个元素的 vector。`with_capacity` 函数 与 `Vec::new` 做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 `size` 个元素,预先进行分配比仅仅 `Vec::new` 要稍微有效率一些,因为 `Vec::new` 随着插入元素而重新改变大小。
如果再次运行 `cargo check` , 会看到一些警告,不过应该可以编译 成功。
如果再次运行 `cargo check` , 它应该会 成功。
#### `Worker` 结构体负责从 `ThreadPool` 中将代码传递给线程
示例 20-14 的 `for` 循环中留下了一个关于创建线程的注释。如何实际创建线程呢?这是一个难题。标准库提供的创建线程的方法,`thread::spawn`,它期望获取一些一旦创建线程就应该执行的代码。然而,我们希望开始线程并使其等待稍后传递的代码。标准库的线程实现并没有包含这么做的方法;我们必须自己实现。
我们将要实现的行为是创建线程并稍后发送代码,这会在 `ThreadPool` 和线程间引入一个新数据类型来管理这种新行为。这个数据结构称为 `Worker` : 这是一个池实现中的常见概念。想象一下在餐馆厨房工作的员工:员工等待来自客户的订单,他们负责接受这些订单并完成它们。
我们将要实现的行为是创建线程并稍后发送代码,这会在 `ThreadPool` 和线程间引入一个新数据类型来管理这种新行为。这个数据结构称为 *Worker* , 这是一个池实现中的常见概念。想象一下在餐馆厨房工作的员工:员工等待来自客户的订单,他们负责接受这些订单并完成它们。
不同于在线程池中储存一个 `JoinHandle<()>` 实例的 vector, 我们会储存 `Worker` 结构体的实例。每一个 `Worker` 会储存一个单独的 `JoinHandle<()>` 实例。接着会在 `Worker` 上实现一个方法,它会获取需要允许代码的闭包并将其发送给已经运行的线程执行。我们还会赋予每一个 worker `id` ,这样就可以在日志和调试中区别线程池中的不同 worker。
首先,让我们做出如此创建 `ThreadPool` 时所需的修改 。在通过如下方式设置完 `Worker` 之后,我们会实现向线程发送闭包的代码:
如下是创建 `ThreadPool` 时会发生的新过程 。在通过如下方式设置完 `Worker` 之后,我们会实现向线程发送闭包的代码:
1. 定义 `Worker` 结构体存放 `id` 和 `JoinHandle<()>`
2. 修改 `ThreadPool` 存放一个 `Worker` 实例的 vector
@ -229,7 +232,9 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
这里将 `ThreadPool` 中字段名从 `threads` 改为 `workers` ,因为它现在储存 `Worker` 而不是 `JoinHandle<()>` 。使用 `for` 循环中的计数作为 `Worker::new` 的参数,并将每一个新建的 `Worker` 储存在叫做 `workers` 的 vector 中。
`Worker` 结构体和其 `new` 函数是私有的,因为外部代码(比如 *src/bin/main.rs* 中的 server) 并不需要知道关于 `ThreadPool` 中使用 `Worker` 结构体的实现细节。`Worker::new` 函数使用 `id` 参数并储存了使用一个空闭包创建的 `JoinHandle<()>` 。
`Worker` 结构体和其 `new` 函数是私有的,因为外部代码(比如 *src/main.rs* 中的 server) 并不需要知道关于 `ThreadPool` 中使用 `Worker` 结构体的实现细节。`Worker::new` 函数使用 `id` 参数并储存了使用一个空闭包创建的 `JoinHandle<()>` 。
> 注意:如果操作系统因为没有足够的系统资源而无法创建线程时,`thread::spawn` 会 panic。这会导致整个 server panic, 即使一些线程可能创建成功了。出于简单的考虑, 这个行为是可行的, 不过在一个生产级别的线程池实现中, 你可能会希望使用 [`std::thread::Builder`][builder] 和其 [`spawn`][builder-spawn] 方法来返回一个 `Result` 。
这段代码能够编译并用指定给 `ThreadPool::new` 的参数创建储存了一系列的 `Worker` 实例,不过 ** 仍然** 没有处理 `execute` 中得到的闭包。让我们聊聊接下来怎么做。
@ -241,13 +246,13 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
在第十六章,我们学习了 ** 信道** —— 一个沟通两个线程的简单手段 —— 对于这个例子来说则是绝佳的。这里信道将充当任务队列的作用,`execute` 将通过 `ThreadPool` 向其中线程正在寻找工作的 `Worker` 实例发送任务。如下是这个计划:
1. `ThreadPool` 会创建一个信道并充当发送端 。
2. 每个 `Worker` 将会充当信道的接收端 。
1. `ThreadPool` 会创建一个信道并充当发送者 。
2. 每个 `Worker` 将会充当接收者 。
3. 新建一个 `Job` 结构体来存放用于向信道中发送的闭包。
4. `execute` 方法会在信道发送端 发出期望执行的任务。
5. 在线程中,`Worker` 会遍历信道的接收端 并执行任何接收到的任务。
4. `execute` 方法会在发送者 发出期望执行的任务。
5. 在线程中,`Worker` 会遍历接收者 并执行任何接收到的任务。
让我们以在 `ThreadPool::new` 中创建信道并让 `ThreadPool` 实例充当发送端 开始,如示例 20-16 所示。`Job` 是将在信道中发出的类型,目前它是一个没有任何内容的结构体:
让我们以在 `ThreadPool::new` 中创建信道并让 `ThreadPool` 实例充当发送者 开始,如示例 20-16 所示。`Job` 是将在信道中发出的类型,目前它是一个没有任何内容的结构体:
< span class = "filename" > 文件名: src/lib.rs< / span >
@ -255,11 +260,11 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
{{#rustdoc_include ../listings/ch20-web-server/listing-20-16/src/lib.rs:here}}
```
< span class = "caption" > 示例 20-16: 修改 `ThreadPool` 来储存一个发送 `Job` 实例的信道发送端 </ span >
< span class = "caption" > 示例 20-16: 修改 `ThreadPool` 来储存一个传输 `Job` 实例的发送者 </ span >
在 `ThreadPool::new` 中,新建了一个信道,并接着让线程池在接收端等待。这段代码能够编译,不过仍有警告 。
在 `ThreadPool::new` 中,新建了一个信道,并接着让线程池在接收端等待。这段代码能够成功 编译。
让我们尝试在线程池创建每个 worker 时将信道的接收端传递给他们。须知我们希望在 worker 所分配的线程中使用信道的接收端 ,所以将在闭包中引用 `receiver` 参数。示例 20-17 中展示的代码还不能编译:
让我们尝试在线程池创建每个 worker 时将接收者传递给他们。须知我们希望在 worker 所分配的线程中使用接收者 ,所以将在闭包中引用 `receiver` 参数。示例 20-17 中展示的代码还不能编译:
< span class = "filename" > 文件名: src/lib.rs< / span >
@ -269,7 +274,7 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
< span class = "caption" > 示例 20-17: 将信道的接收端传递给 worker< / span >
这是一些小而直观的修改:将信道的接收端 传递进了 `Worker::new` ,并接着在闭包中使用它。
这是一些小而直观的修改:将接收者 传递进了 `Worker::new` ,并接着在闭包中使用它。
如果尝试 check 代码,会得到这个错误:
@ -277,7 +282,7 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
{{#include ../listings/ch20-web-server/listing-20-17/output.txt}}
```
这段代码尝试将 `receiver` 传递给多个 `Worker` 实例。这是不行的, 回忆第十六章: Rust 所提供的信道实现是多 ** 生产者**,单 ** 消费者** 的。这意味着不能简单的克隆信道的消费端来解决问题。即便可以,那也不是我们希望使用的技术;我们希望通过在所有的 worker 中共享单一 `receiver` ,在线程间分发任务 。
这段代码尝试将 `receiver` 传递给多个 `Worker` 实例。这是不行的, 回忆第十六章: Rust 所提供的信道实现是多 ** 生产者**,单 ** 消费者** 的。这意味着不能简单的克隆信道的消费端来解决问题。我们也不希望将一个消息向多个消费者发送多次;我们希望有一个消息列表和多个 worker 这样每个消息就只会处理一次 。
另外,从信道队列中取出任务涉及到修改 `receiver` ,所以这些线程需要一个能安全的共享和修改 `receiver` 的方式,否则可能导致竞争状态(参考第十六章)。
@ -289,9 +294,9 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
{{#rustdoc_include ../listings/ch20-web-server/listing-20-18/src/lib.rs:here}}
```
< span class = "caption" > 示例 20-18: 使用 `Arc` 和 `Mutex` 在 worker 间共享信道的接收端 </ span >
< span class = "caption" > 示例 20-18: 使用 `Arc` 和 `Mutex` 在 worker 间共享接收者 </ span >
在 `ThreadPool::new` 中,将信道的接收端 放入一个 `Arc` 和一个 `Mutex` 中。对于每一个新 worker, 克隆 `Arc` 来增加引用计数,如此这些 worker 就可以共享接收端 的所有权了。
在 `ThreadPool::new` 中,将接收者 放入一个 `Arc` 和一个 `Mutex` 中。对于每一个新 worker, 克隆 `Arc` 来增加引用计数,如此这些 worker 就可以共享接收者 的所有权了。
通过这些修改,代码可以编译了!我们做到了!
@ -350,10 +355,9 @@ warning: field is never read: `thread`
49 | thread: thread::JoinHandle< ()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: 3 warnings emitted
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/ main `
Running `target/debug/ hello `
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
@ -387,5 +391,6 @@ Worker 2 got a job; executing.
[creating-type-synonyms-with-type-aliases]:
ch19-04-advanced-types.html#类型别名用来创建类型同义词
[integer-types]: ch03-02-data-types.html#整型
[storing-closures-using-generic-parameters-and-the-fn-traits]:
ch13-01-closures.html#使用带有泛型和-fn-trait-的闭包
[fn-traits]: ch13-01-closures.html#将被捕获的值移出闭包和-fn-trait
[builder]: https://doc.rust-lang.org/std/thread/struct.Builder.html
[builder-spawn]: https://doc.rust-lang.org/std/thread/struct.Builder.html#method.spawn