update ch20-06

pull/67/head
KaiserY 8 years ago
parent a81e21fcc4
commit 2877ecf00d

@ -117,4 +117,13 @@
- [设计线程池接口](ch20-03-designing-the-interface.md)
- [创建线程池并储存线程](ch20-04-storing-threads.md)
- [使用通道向线程发送请求](ch20-05-sending-requests-via-channels.md)
- [Graceful Shutdown 与清理](ch20-06-graceful-shutdown-and-cleanup.md)
- [Graceful Shutdown 与清理](ch20-06-graceful-shutdown-and-cleanup.md)
- [附录](appendix-00.md)
- [A - 关键字](appendix-01-keywords.md)
- [B - 运算符](appendix-02-operators.md)
- [C - 可导出的 trait]()
- [D - Rust 开发版]()
- [E - 宏]()
- [F - 本书翻译]()
- [G - 最新功能](appendix-07-newest-features.md)

@ -36,4 +36,150 @@ impl ThreadPool {
趁着这个机会我们用文档注释为 `ThreadPool` 增加了一些文档。注意这里遵循了良好的文档实践并增加了一个部分提示函数会 panic 的情况,正如第十四章所讨论的。尝试运行 `cargo doc --open` 并点击 `ThreadPool` 结构体来查看生成的 `new` 的文档看起来如何!
相比像这里使用 `assert!` 宏,也可以让 `new` 像之前 I/O 项目中列表 12-9 中 `Config::new` 那样返回一个 `Result`
相比像这里使用 `assert!` 宏,也可以让 `new` 像之前 I/O 项目中列表 12-9 中 `Config::new` 那样返回一个 `Result`,不过在这里我们选择创建一个没有任何线程的线程池应该是要给不可恢复的错误。如果你想做的更好,尝试编写一个采用如下签名的 `new` 版本来感受一下两者的区别:
```rust
fn new(size: u32) -> Result<ThreadPool, PoolCreationError> {
```
### 在线程池中储存线程
现在有了一个有效的线程池线程数,就可以实际创建这些线程并在返回之前将他们储存在 `ThreadPool` 结构体中。
这引出了另一个问题:如何“储存”一个线程?让我们再看看 `thread::spawn` 的签名:
```rust
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static
```
`spawn` 返回 `JoinHandle<T>`,其中 `T` 是闭包返回的类型。尝试使用 `JoinHandle` 来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 `T` 将会是单元类型 `()`
这还不能编译,不过考虑一下列表 20-14 所示的代码。我们改变了 `ThreadPool` 的定义来存放一个 `thread::JoinHandle<()>` 的 vector 实例,使用 `size` 容量来初始化,并设置一个 `for` 循环了来运行创建线程的代码,并返回包含这些线程的 `ThreadPool` 实例:
<span class="filename">文件名: src/lib.rs</span>
```rust,ignore
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// ...snip...
pub fn new(size: u32) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool {
threads
}
}
// ...snip...
}
```
<span class="caption">列表 20-14`ThreadPool` 创建一个 vector 来存放线程</span>
这里将 `std::thread` 引入库 crate 的作用域,因为使用了 `thread::JoinHandle` 作为 `ThreadPool` 中 vector 元素的类型。
在得到了有效的数量之后,就可以新建一个存放 `size` 个元素的 vector。本书还未使用过 `with_capacity`;它与 `Vec::new` 做了同样的工作,不过有一个重要的区别:它为 vector 预先分配空间。因为已经知道了 vector 中需要 `size` 个元素,预先进行分配比仅仅 `Vec::new` 要稍微有效率一些,因为 `Vec::new` 随着插入元素而重新改变大小。因为一开始就用所需的确定大小来创建 vector为其增减元素时不会改变底层 vector 的大小。
如果代码能够工作就应是如此效果,不过他们还不能工作!如果检查他们,会得到一个错误:
```
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0308]: mismatched types
--> src\main.rs:70:46
|
70 | let mut threads = Vec::with_capacity(size);
| ^^^^ expected usize, found u32
error: aborting due to previous error
```
`size``u32`,不过 `Vec::with_capacity` 需要一个 `usize`。这里有两个选择:可以改变函数签名,或者可以将 `u32` 转换为 `usize`。如果你还记得定义 `new` 时,并没有仔细考虑有意义的数值类型,只是随便选了一个。现在来进行一些思考吧。考虑到 `size` 是 vector 的长度,`usize` 就很有道理了。甚至他们的名字都很类似!改变 `new` 的签名,这会使列表 20-14 的代码能够编译:
```rust
fn new(size: usize) -> ThreadPool {
```
如果再次运行 `cargo check`,会得到一些警告,不过应该能成功编译。
列表 20-14 的 `for` 循环中留下了一个关于创建线程的注释。如何实际创建线程呢?这是一个难题。这些线程应该做什么呢?这里并不知道他们需要做什么,因为 `execute` 方法获取闭包并传递给了线程池。
让我们稍微重构一下:不再储存一个 `JoinHandle<()>` 实例的 vector将创建一下新的结构体来代表 *worker* 的概念。worker 会接收 `execute` 方法,并会处理实际的闭包调用。另外储存固定 `size` 数量的还不知道将要执行什么闭包的 `Worker` 实例,也可以为每一个 worker 设置一个 `id`,这样就可以在日志和调试中区别线程池中的不同 worker。
让我们做出如下修改:
1. 定义 `Worker` 结构体存放 `id``JoinHandle<()>`
2. 修改 `ThreadPool` 存放一个 `Worker` 实例的 vector
3. 定义 `Worker::new` 函数,它获取一个 `id` 数字并返回一个带有 `id` 和用空闭包分配的线程的 `Worker` 实例,之后会修复这些
4. 在 `ThreadPool::new` 中,使用 `for` 循环来计数生成 `id`,使用这个 `id` 新建 `Worker`,并储存进 vector 中
如果你渴望挑战,在查看列表 20-15 中的代码之前尝试自己实现这些修改。
准备好了吗?列表 20-15 就是一个做出了这些修改的例子:
<span class="filename">文件名: src/lib.rs</span>
```rust
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// ...snip...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers
}
}
// ...snip...
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker {
id,
thread,
}
}
}
```
<span class="caption">列表 20-15修改 `ThreadPool` 存放 `Worker` 实例而不是直接存放线程</span>
这里选择将 `ThreadPool` 中字段名从 `threads` 改为 `workers`,因为我们改变了存放内容为 `Worker` 而不是 `JoinHandle<()>`。使用 `for` 循环中的计数作为 `Worker::new` 的参数,并将每一个新建的 `Worker` 储存在叫做 `workers` 的 vector 中。
`Worker` 结构体和其 `new` 函数是私有的,因为外部代码(比如 *src/bin/main.rs* 中的 server并不需要 `ThreadPool` 中使用 `Worker` 结构体的实现细节。`Worker::new` 函数使用 `id` 参数并储存了使用一个空闭包创建的 `JoinHandle<()>`
这段代码能够编译并用指定给 `ThreadPool::new` 的参数创建储存了一系列的 `Worker` 实例,不过**仍然**没有处理 `execute` 中得到的闭包。让我们聊聊接下来怎么做。

@ -0,0 +1,394 @@
## 使用通道向线程发送请求
> [ch20-05-sending-requests-via-channels.md](https://github.com/rust-lang/book/blob/master/second-edition/src/ch20-05-sending-requests-via-channels.md)
> <br>
> commit 2e269ff82193fd65df8a87c06561d74b51ac02f7
下一个需要解决的问题是(线程中的)闭包完全没有做任何工作。我们一直在绕过获取 `execute` 方法中实际期望执行的闭包的问题,不过看起来在创建 `ThreadPool` 时就需要知道实际的闭包。
不过考虑一下真正需要做的:我们希望刚创建的 `Worker` 结构体能够从 `ThreadPool` 的队列中获取任务,并在线程中执行他们。
在第十六章中,我们学习了通道。通道是一个沟通两个线程的良好手段,对于这个例子来说则是绝佳的。通道将充当任务队列的作用,`execute` 将通过 `ThreadPool` 向其中线程正在寻找工作的 `Worker` 实例发送任务。如下是这个计划:
1. `ThreadPool` 会创建一个通道并充当发送端。
2. 每个 `Worker` 将会充当通道的接收端。
3. 新建一个 `Job` 结构体来存放用于向通道中发送的闭包。
4. `ThreadPool``execute` 方法会在发送端发出期望执行的任务。
5. 在线程中,`Worker` 会遍历通道的接收端并执行任何接收到的任务。
让我们以在 `ThreadPool::new` 中创建通道并让 `ThreadPool` 实例充当发送端开始,如列表 20-16 所示。`Job` 是将在通道中发出的类型;目前它是一个没有任何内容的结构体:
<span class="filename">文件名: src/lib.rs</span>
```rust
# use std::thread;
// ...snip...
use std::sync::mpsc;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// ...snip...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers,
sender,
}
}
// ...snip...
}
#
# struct Worker {
# id: usize,
# thread: thread::JoinHandle<()>,
# }
#
# impl Worker {
# fn new(id: usize) -> Worker {
# let thread = thread::spawn(|| {});
#
# Worker {
# id,
# thread,
# }
# }
# }
```
<span class="caption">列表 20-16修改 `ThreadPool` 来储存一个发送 `Job` 实例的通道发送端</span>
`ThreadPool::new` 中,新建了一个通道,并接着让线程池在接收端等待。这段代码能够编译,不过仍有警告。
在线程池创建每个 worker 时将通道的接收端传递给他们。须知我们希望在 worker 所分配的线程中使用通道的接收端,所以将在闭包中引用 `receiver` 参数。列表 20-17 中展示的代码还不能编译:
<span class="filename">文件名: src/lib.rs</span>
```rust,ignore
impl ThreadPool {
// ...snip...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool {
workers,
sender,
}
}
// ...snip...
}
// ...snip...
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker {
id,
thread,
}
}
}
```
<span class="caption">列表 20-17将通道的接收端传递给 worker</span>
这是一些小而直观的修改:将通道的接收端传递进了 `Worker::new`,并接着在闭包中使用他们。
如果尝试检查代码,会得到这个错误:
```
$ cargo check
Compiling hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:27:42
|
27 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here in
previous iteration of loop
|
= note: move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
```
这些代码还不能编译的原因如上因为它尝试将 `receiver` 传递给多个 `Worker` 实例。回忆第十六章Rust 所提供的通道实现是多**生产者**,单**消费者**的,所以不能简单的克隆通道的消费端来解决问题。即便可以我们也不希望克隆消费端;在所有的 worker 中共享单一 `receiver` 才是我们希望的在线程间分发任务的机制。
另外,从通道队列中取出任务涉及到修改 `receiver`,所以这些线程需要一个能安全的共享和修改 `receiver` 的方式。如果修改不是线程安全的,则可能遇到竞争状态,例如两个线程因同时在队列中取出相同的任务并执行了相同的工作。
所以回忆一下第十六章讨论的线程安全智能指针,为了在多个线程间共享所有权并允许线程修改其值,需要使用 `Arc<Mutex<T>>`。`Arc` 使得多个 worker 拥有接收端,而 `Mutex` 则确保一次只有一个 worker 能从接收端得到任务。列表 20-18 展示了所做的修改:
<span class="filename">文件名: src/lib.rs</span>
```rust
# use std::thread;
# use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
// ...snip...
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
# struct Job;
#
impl ThreadPool {
// ...snip...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool {
workers,
sender,
}
}
// ...snip...
}
# struct Worker {
# id: usize,
# thread: thread::JoinHandle<()>,
# }
#
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// ...snip...
# let thread = thread::spawn(|| {
# receiver;
# });
#
# Worker {
# id,
# thread,
# }
}
}
```
<span class="caption">列表 20-18使用 `Arc``Mutex` 在 worker 间共享通道的接收端</span>
`ThreadPool::new` 中,将通道的接收端放入一个 `Arc` 和一个 `Mutex` 中。对于每一个新 worker则克隆 `Arc` 来增加引用计数,如此这些 worker 就可以共享接收端的所有权了。
通过这些修改,代码可以编译了!我们做到了!
最好让我们实现 `ThreadPool` 上的 `execute` 方法。同时也要修改 `Job` 结构体:它将不再是结构体,`Job` 将是一个有着 `execute` 接收到的闭包类型的 trait 对象的类型别名。我们讨论过类型别名如何将长的类型变短,现在就这种情况!看一看列表 20-19
<span class="filename">文件名: src/lib.rs</span>
```rust
// ...snip...
# pub struct ThreadPool {
# workers: Vec<Worker>,
# sender: mpsc::Sender<Job>,
# }
# use std::sync::mpsc;
# struct Worker {}
type Job = Box<FnOnce() + Send + 'static>;
impl ThreadPool {
// ...snip...
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// ...snip...
```
<span class="caption">列表 20-19为存放每一个闭包的 `Box` 创建一个 `Job` 类型别名,接着在通道中发出</span>
在使用 `execute` 得到的闭包新建 `Job` 实例之后,将这些任务从通道的发送端发出。这里调用 `send` 上的 `unwrap`,因为如果接收端停止接收新消息则发送可能会失败,这可能发生于我们停止了所有的执行线程。不过目前这是不可能的,因为只要线程池存在他们就会一直执行。使用 `unwrap` 是因为我们知道失败不可能发生,即便编译器不这么认为,正如第九章讨论的这是 `unwrap` 的一个恰当用法。
那我们结束了吗?不完全是!在 worker 中,传递给 `thread::spawn` 的闭包仍然还只是**引用**了通道的接收端。但是我们需要闭包一直循环,向通道的接收端请求任务,并在得到任务时执行他们。如列表 20-20 对 `Worker::new` 做出修改:
<span class="filename">文件名: src/lib.rs</span>
```rust
// ...snip...
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
(*job)();
}
});
Worker {
id,
thread,
}
}
}
```
<span class="caption">列表 20-20 在 worker 线程中接收并执行任务</span>
这里,首先在 `receiver` 上调用了 `lock` 来获取互斥器,接着 `unwrap` 在出现任何错误时 panic。如果互斥器处于一种叫做**被污染***poisoned*)的状态时获取锁肯能会失败,这可能发生于其他线程在持有锁时 panic 了并没有释放锁。如果当前线程因为这个原因不能得到所,调用 `unwrap` 使其 panic 也是正确的行为。如果你觉得有意义的话请随意将 `unwrap` 改为带有错误信息的 `expect`
如果锁定了互斥器,接着调用 `recv` 从通道中接收 `Job`。最后的 `unwrap` 也绕过了一些错误,`recv` 在通道的发送端关闭时会返回 `Err`,类似于 `send` 在接收端关闭时返回 `Err` 一样。
调用 `recv` 的代码块;也就是说,它还没有任务,这个线程会等待直到有可用的任务。`Mutex<T>` 确保一次只有一个 `Worker` 线程尝试请求任务。
理论上这段代码应该能够编译。不幸的是Rust 编译器仍不够完美,会给出如下错误:
```
error[E0161]: cannot move a value of type std::ops::FnOnce() +
std::marker::Send: the size of std::ops::FnOnce() + std::marker::Send cannot be
statically determined
--> src/lib.rs:63:17
|
63 | (*job)();
| ^^^^^^
```
这个错误非常的神秘,因为这个问题本身就很神秘。为了调用储存在 `Box<T>` (这正是 `Job` 别名的类型)中的 `FnOnce` 闭包,该闭包需要能将自己移动出 `Box<T>`,因为当调用这个闭包时,它获取 `self` 的所有权。通常来说,将值移动出 `Box<T>` 是不被允许的,因为 Rust 不知道 `Box<T>` 中的值将会有多大;回忆第十五章能够正常使用 `Box<T>` 是因为我们将未知大小的值储存进 `Box<T>` 从而得到已知大小的值。
第十七章曾见过,列表 17-15 中有使用了 `self: Box<Self>` 语法的方法,它获取了储存在 `Box<T>` 中的 `Self` 值的所有权。这正是我们希望做的,然而不幸的是 Rust 调用闭包的那部分实现并没有使用 `self: Box<Self>`。所以这里 Rust 也不知道它可以使用 `self: Box<Self>` 来获取闭包的所有权并将闭包移动出 `Box<T>`
将来列表 20-20 中的代码应该能够正常工作。Rust 仍在努力改进提升编译器。有很多像你一样的人正在修复这个以及其他问题!当你结束了本书的阅读,我们希望看到你也成为他们中的一员。
不过目前让我们绕过这个问题。所幸有一个技巧可以显式的告诉 Rust 我们处于可以获取使用 `self: Box<Self>``Box<T>` 中值的所有权的状态,而一旦获取了闭包的所有权就可以调用它了。这涉及到定义一个新 trait它带有一个在签名中使用 `self: Box<Self>` 的方法 `call_box`,为任何实现了 `FnOnce()` 的类型定义这个 trait修改类型别名来使用这个新 trait并修改 `Worker` 使用 `call_box` 方法。这些修改如列表 20-21 所示:
<span class="filename">文件名: src/lib.rs</span>
```rust
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Job = Box<FnBox + Send + 'static>;
// ...snip...
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job.call_box();
}
});
Worker {
id,
thread,
}
}
}
```
<span class="caption">列表 20-21新增一个 trait `FnBox` 来绕过当前 `Box<FnOnce()>` 的限制</span>
首先,新建了一个叫做 `FnBox` 的 trait。这个 trait 有一个方法 `call_box`,它类似于其他 `Fn*` trait 中的 `call` 方法,除了它获取 `self: Box<Self>` 以便获取 `self` 的所有权并将值从 `Box<T>` 中移动出来。
现在我们希望 `Job` 类型别名是任何实现了新 trait `FnBox``Box`,而不是 `FnOnce()`。这允许我们在得到 `Job` 值时使用 `Worker` 中的 `call_box`。因为我们为任何 `FnOnce()` 闭包都实现了 `FnBox` trait无需对实际在通道中发出的值做任何修改。
最后,对于 `Worker::new` 的线程中所运行的闭包,调用 `call_box` 而不是直接执行闭包。现在 Rust 就能够理解我们的行为是正确的了。
这是非常狡猾且复杂的手段。无需过分担心他们并不是非常有道理;总有一天,这一切将是毫无必要的。
通过这些技巧,线程池处于可以运行的状态了!执行 `cargo run` 并发起一些请求:
```
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never used: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never used: `id`
--> src/lib.rs:61:5
|
61 | id: usize,
| ^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never used: `thread`
--> src/lib.rs:62:5
|
62 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
```
成功了!现在我们有了一个可以异步执行连接的线程池!它绝不会创建超过四个线程,所以当 server 收到大量请求时系统也不会负担过重。如果请求 `/sleep`server 也能够通过另外一个线程处理其他请求。
那么这些警告怎么办呢?难道我们没有使用 `workers`、`id` 和 `thread` 字段吗?好吧,目前我们用了所有这些字段存放了一些数据,不过当设置线程池并开始执行代码在通道中向线程发送任务时,我们并没有对数据**进行**任何实际的操作。但是如果不存放这些值,他们将会离开作用域:比如,如果不将 `Vec<Worker>` 值作为 `ThreadPool` 的一部分返回,这个 vector 在 `ThreadPool::new` 的结尾就会被清理。
那么这些警告有错吗?从某种角度上讲是的,这些警告是错误的,因为我们使用这些字段储存一直需要的数据。从另一种角度来说也不对:使用过后我们也没有做任何操作清理线程池,仅仅通过 <span class="keystroke">ctrl-C</span> 来停止程序并让操作系统为我们清理。下面让我们实现 graceful shutdown 来清理所创建的一切。

@ -0,0 +1,490 @@
## Graceful Shutdown 与清理
> [ch20-06-graceful-shutdown-and-cleanup.md](https://github.com/rust-lang/book/blob/master/second-edition/src/ch20-06-graceful-shutdown-and-cleanup.md)
> <br>
> commit 2e269ff82193fd65df8a87c06561d74b51ac02f7
列表 20-21 中的代码如期通过使用线程池异步的响应请求。这里有一些警告说存在一些字段并没有直接被使用,这提醒了我们并没有清理任何内容。当使用 <span class="keystroke">ctrl-C</span> 终止主线程,所有其他线程也会立刻停止,即便他们正在处理一个请求。
现在我们要为 `ThreadPool` 实现 `Drop` trait 对线程池中的每一个线程调用 `join`,这样这些线程将会执行完他们的请求。接着会为 `ThreadPool` 实现一个方法来告诉线程他们应该停止接收新请求并结束。为了实践这些代码,修改 server 在 graceful Shutdown 之前只接受两个请求。
现在开始为线程池实现 `Drop`。当线程池被丢弃时,应该 join 所有线程以确保他们完成其操作。列表 20-22 展示了 `Drop` 实现的第一次尝试;这些代码还不能够编译:
<span class="filename">文件名: src/lib.rs</span>
```rust
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
```
<span class="caption">列表 20-22当线程池离开作用域时 join 每个线程</span>
这里遍历线程池中的每个 `workers`,这里使用了 `&mut` 因为 `self` 本身是一个可变引用而且也需要能够修改 `worker`。当特定 worker 关闭时会打印出说明信息,接着在对应 worker 上调用 `join`。如果 `join` 失败了,通过 `unwrap` 将错误变为 panic 从而无法进行 graceful Shutdown。
如下是尝试编译代码时得到的错误:
```
error[E0507]: cannot move out of borrowed content
--> src/lib.rs:65:13
|
65 | worker.thread.join().unwrap();
| ^^^^^^ cannot move out of borrowed content
```
因为我们只有每个 `worker` 的可变借用,并不能调用 `join``join` 获取其参数的所有权。为了解决这个问题,需要一个方法将 `thread` 移动出拥有其所有权的 `Worker` 实例以便 `join` 可以消费这个线程。列表 17-15 中我们曾见过这么做的方法:如果 `Worker` 存放的是 `Option<thread::JoinHandle<()>`,就可以在 `Option` 上调用 `take` 方法将值从 `Some` 成员中移动出来而对 `None` 成员不做处理。换句话说,正在运行的 `Worker``thread` 将是 `Some` 成员值,而当需要清理 worker 时,将 `Some` 替换为 `None`,这样 worker 就没有可以运行的线程了。
所以我们知道了需要更新 `Worker` 的定义为如下:
<span class="filename">文件名: src/lib.rs</span>
```rust
# use std::thread;
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
```
现在依靠编译器来找出其他需要修改的地方。我们会得到两个错误:
```
error: no method named `join` found for type
`std::option::Option<std::thread::JoinHandle<()>>` in the current scope
--> src/lib.rs:65:27
|
65 | worker.thread.join().unwrap();
| ^^^^
error[E0308]: mismatched types
--> src/lib.rs:89:21
|
89 | thread,
| ^^^^^^ expected enum `std::option::Option`, found
struct `std::thread::JoinHandle`
|
= note: expected type `std::option::Option<std::thread::JoinHandle<()>>`
found type `std::thread::JoinHandle<_>`
```
第二个错误指向 `Worker::new` 结尾的代码;当新建 `Worker` 时需要将 `thread` 值封装进 `Some`
<span class="filename">文件名: src/lib.rs</span>
```rust
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// ...snip...
Worker {
id,
thread: Some(thread),
}
}
}
```
第一个错误有关 `Drop` 实现,而且我们提到过要调用 `Option` 上的 `take``thread` 移动出 `worker`。如下是代码:
<span class="filename">文件名: src/lib.rs</span>
```rust
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
```
如第十七章我们见过的,`Option` 上的 `take` 方法会取出 `Some` 而留下 `None`。使用 `if let` 解构 `Some` 并得到线程,接着在线程上调用 `join`。如果 worker 的线程已然是 `None`,就知道此时这个 worker 已经清理了其线程且无需做任何操作。
有了这些修改,代码就能编译且没有任何警告。不过也有坏消息,这些代码还不能以我们期望的方式运行。问题的关键在于 `Worker` 中分配的线程所运行的闭包中的逻辑:调用 `join` 并不会关闭线程,因为他们一直 `loop` 来寻找任务。如果采用这个实现来尝试丢弃 `ThreadPool` ,则主线程会永远阻塞在等待第一个线程结束上。
为了修复这个问题,修改线程既监听是否有 `Job` 运行也要监听应该停止监听并退出无限循环的信号。所以通道将发送这个枚举的两个成员之一而不再直接使用 `Job` 实例:
<span class="filename">文件名: src/lib.rs</span>
```rust
# struct Job;
enum Message {
NewJob(Job),
Terminate,
}
```
`Message` 枚举要么是存放了线程需要运行的 `Job``NewJob` 成员,要么是会导致线程退出循环并终止的 `Terminate` 成员。
同时需要修改通道来使用 `Message` 类型值而不是 `Job`,如列表 20-23 所示:
<span class="filename">文件名: src/lib.rs</span>
```rust
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
// ...snip...
impl ThreadPool {
// ...snip...
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
// ...snip...
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
// ...snip...
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
Worker {
let thread = thread::spawn(move ||{
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job.call_box();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
```
<span class="caption">列表 20-23收发 `Message` 值并在 `Worker` 收到 `Message::Terminate` 时退出循环</span>
需要将 `ThreadPool` 定义、创建通道的 `ThreadPool::new``Worker::new` 签名中的 `Job` 改为 `Message`。`ThreadPool` 的 `execute` 方法需要发送封装进 `Message::NewJob` 成员的任务,当获取到 `NewJob` 时会处理任务而收到 `Terminate` 成员时则会退出循环。
通过这些修改,代码再次能够编译并按照期望的行为运行。不过还是会得到一个警告,因为并没有在任何消息中使用 `Terminate` 成员。如列表 20-14 所示那样修改 `Drop` 实现:
<span class="filename">文件名: src/lib.rs</span>
```rust,ignore
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
```
<span class="caption">列表 20-24在对每个 worker 线程调用 `join` 之前向 worker 发送 `Message::Terminate`</span>
现在遍历了 worker 两次,一次向每个 worker 发送一个 `Terminate` 消息,一个调用每个 worker 线程上的 `join`。如果尝试在同一循环中发送消息并立即 join 线程,则无法保证当前迭代的 worker 是从通道收到终止消息的 worker。
为了更好的理解为什么需要两个分开的循环,想象一下只有两个 worker 的场景。如果在一个循环中遍历每个 worker在第一次迭代中 `worker` 是第一个 worker我们向通道发出终止消息并对第一个 worker 线程调用 `join`。如果第一个 worker 当时正忙于处理请求,则第二个 worker 会从通道接收这个终止消息并结束。而我们在等待第一个 worker 结束,不过它永远也不会结束因为第二个线程取走了终止消息。现在我们就阻塞在了等待第一个 worker 结束,而无法发出第二条终止消息。死锁!
为了避免此情况,首先从通道中取出所有的 `Terminate` 消息,接着 join 所有的线程。因为每个 worker 一旦收到终止消息即会停止从通道接收消息,我们就可以确保如果发送同 worker 数相同的终止消息,在 join 之前每个线程都会收到一个终止消息。
为了实践这些代码,如列表 20-25 所示修改 `main` 在 graceful Shutdown server 之前只接受两个请求:
<span class="filename">文件名: src/bin/main.rs</span>
```rust,ignore
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let pool = ThreadPool::new(4);
let mut counter = 0;
for stream in listener.incoming() {
if counter == 2 {
println!("Shutting down.");
break;
}
counter += 1;
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
```
<span class="caption">列表 20-25在处理两个请求之后通过退出循环来停止 server</span>
只处理两次请求并不是生产环境的 web server 所期望的行为,不过这可以让我们看清 graceful shutdown 和清理起作用了,因为不用再通过 <span class="keystroke">ctrl-C</span> 停止 server 了。
这里还增加了一个 `counter` 变量在每次收到 TCP 流时递增。如果计数到达 2会停止处理请求并退出 `for` 循环。`ThreadPool` 会在 `main` 的结尾离开作用域,而且还会看到 `drop` 实现的运行。
使用 `cargo run` 启动 server并发起三个请求。第三个请求应该会失败而终端的输出应该看起来像这样
```
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 1.0 secs
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 3 got a job; executing.
Shutting down.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Worker 1 was told to terminate.
Worker 2 was told to terminate.
Worker 0 was told to terminate.
Worker 3 was told to terminate.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3
```
当然,你可能会看到不同顺序的输出。可以从信息中看到服务是如何运行的: worker 0 和 worker 3 获取了头两个请求,接着在第三个请求时,我们停止接收连接。当 `ThreadPool``main` 的结尾离开作用域时,其 `Drop` 实现开始工作,线程池通知所有线程终止。每个 worker 在收到终止消息时会打印出一个信息,接着线程池调用 `join` 来终止每一个 worker 线程。
这个特定的运行过程中一个有趣的地方在于:注意我们向通道中发出终止消息,而在任何线程收到消息之前,就尝试 join worker 0 了。worker 0 还没有收到终止消息,所以主线程阻塞直到 worker 0 结束。与此同时,每一个线程都收到了终止消息。一旦 worker 0 结束,主线程就等待其他 worker 结束,此时他们都已经收到终止消息并能够停止了。
恭喜!现在我们完成了这个项目,也有了一个使用线程池异步响应请求的基础 web server。我们能对 server 执行 graceful shutdown它会清理线程池中的所有线程。如下是完整的代码参考
<span class="filename">Filename: src/bin/main.rs</span>
```rust
extern crate hello;
use hello::ThreadPool;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::fs::File;
use std::thread;
use std::time::Duration;
fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
let pool = ThreadPool::new(4);
let mut counter = 0;
for stream in listener.incoming() {
if counter == 2 {
println!("Shutting down.");
break;
}
counter += 1;
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 512];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let mut file = File::open(filename).unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
```
<span class="filename">Filename: src/lib.rs</span>
```rust
use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
enum Message {
NewJob(Job),
Terminate,
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
type Job = Box<FnBox + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool {
workers,
sender,
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}
println!("Shutting down all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
Worker {
let thread = thread::spawn(move ||{
loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job.call_box();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
```
这里还有很多可以做的事!如果你希望继续增强这个项目,如下是一些点子:
- 为 `ThreadPool` 和其公有方法增加更多文档
- 为库的功能增加测试
- 将 `unwrap` 调用改为更健壮的错误处理
- 使用 `ThreadPool` 进行其他不同于处理网络请求的任务
- 在 crates.io 寻找一个线程池 crate 并使用它实现一个类似的 web server将其 API 和鲁棒性与我们的实现做对比
## 总结
好极了!你结束了本书的学习!由衷感谢你与我们一道加入这次 Rust 之旅。现在你已经准备好出发并实现自己的 Rust 项目或帮助他人了。请不要忘记我们的社区,这里有其他 Rustaceans 正乐于帮助你迎接 Rust 之路上的任何挑战。
Loading…
Cancel
Save