@ -2,7 +2,7 @@
> [ch20-02-multithreaded.md ](https://github.com/rust-lang/book/blob/main/src/ch20-02-multithreaded.md )
> [ch20-02-multithreaded.md ](https://github.com/rust-lang/book/blob/main/src/ch20-02-multithreaded.md )
> < br >
> < br >
> commit 120e76a0cc77c9cde52643f847ed777f8f441817
> commit 95b5e7c86d33e98eec6f73b268d106621f3d24a1
目前 server 会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。
目前 server 会依次处理每一个请求,意味着它在完成第一个连接的处理之前不会处理第二个连接。如果 server 正接收越来越多的请求,这类串行操作会使性能越来越差。如果一个请求花费很长时间来处理,随后而来的请求则不得不等待这个长请求结束,即便这些新请求可以很快就处理完。我们需要修复这种情况,不过首先让我们实际尝试一下这个问题。
@ -12,33 +12,8 @@
< span class = "filename" > 文件名: src/main.rs< / span >
< span class = "filename" > 文件名: src/main.rs< / span >
```rust
```rust,no_run
use std::thread;
{{#rustdoc_include ../listings/ch20-web-server/listing-20-10/src/main.rs:here}}
use std::time::Duration;
# use std::io::prelude::*;
# use std::net::TcpStream;
# use std::fs::File;
// --snip--
fn handle_connection(mut stream: TcpStream) {
# let mut buffer = [0; 1024];
# stream.read(& mut buffer).unwrap();
// --snip--
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK", "hello.html")
} else if buffer.starts_with(sleep) {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND", "404.html")
};
// --snip--
}
```
```
< span class = "caption" > 示例 20-10: 通过识别 */sleep* 并休眠五秒来模拟慢请求</ span >
< span class = "caption" > 示例 20-10: 通过识别 */sleep* 并休眠五秒来模拟慢请求</ span >
@ -72,23 +47,7 @@ fn handle_connection(mut stream: TcpStream) {
< span class = "filename" > 文件名: src/main.rs< / span >
< span class = "filename" > 文件名: src/main.rs< / span >
```rust,no_run
```rust,no_run
# use std::thread;
{{#rustdoc_include ../listings/ch20-web-server/listing-20-11/src/main.rs:here}}
# use std::io::prelude::*;
# use std::net::TcpListener;
# use std::net::TcpStream;
#
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
# fn handle_connection(mut stream: TcpStream) {}
```
```
< span class = "caption" > 示例 20-11: 为每一个流新建一个线程< / span >
< span class = "caption" > 示例 20-11: 为每一个流新建一个线程< / span >
@ -101,31 +60,8 @@ fn main() {
< span class = "filename" > 文件名: src/main.rs< / span >
< span class = "filename" > 文件名: src/main.rs< / span >
```rust,no_run
```rust,ignore,does_not_compile
# use std::thread;
{{#rustdoc_include ../listings/ch20-web-server/listing-20-12/src/main.rs:here}}
# use std::io::prelude::*;
# use std::net::TcpListener;
# use std::net::TcpStream;
# struct ThreadPool;
# impl ThreadPool {
# fn new(size: u32) -> ThreadPool { ThreadPool }
# fn execute< F > (& self, f: F)
# where F: FnOnce() + Send + 'static {}
# }
#
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
# fn handle_connection(mut stream: TcpStream) {}
```
```
< span class = "caption" > 示例 20-12: 假想的 `ThreadPool` 接口</ span >
< span class = "caption" > 示例 20-12: 假想的 `ThreadPool` 接口</ span >
@ -136,17 +72,8 @@ fn main() {
继续并对示例 20-12 中的 *src/main.rs* 做出修改,并利用来自 `cargo check` 的编译器错误来驱动开发。下面是我们得到的第一个错误:
继续并对示例 20-12 中的 *src/main.rs* 做出修改,并利用来自 `cargo check` 的编译器错误来驱动开发。下面是我们得到的第一个错误:
```text
```console
$ cargo check
{{#include ../listings/ch20-web-server/listing-20-12/output.txt}}
Compiling hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve. Use of undeclared type or module `ThreadPool`
--> src\main.rs:10:16
|
10 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^^^^^^ Use of undeclared type or module
`ThreadPool`
error: aborting due to previous error
```
```
好的,这告诉我们需要一个 `ThreadPool` 类型或模块,所以我们将构建一个。`ThreadPool` 的实现会与 web server 的特定工作相独立,所以让我们从 `hello` crate 切换到存放 `ThreadPool` 实现的新库 crate。这也意味着可以在任何工作中使用这个单独的线程池库, 而不仅仅是处理网络请求。
好的,这告诉我们需要一个 `ThreadPool` 类型或模块,所以我们将构建一个。`ThreadPool` 的实现会与 web server 的特定工作相独立,所以让我们从 `hello` crate 切换到存放 `ThreadPool` 实现的新库 crate。这也意味着可以在任何工作中使用这个单独的线程池库, 而不仅仅是处理网络请求。
@ -155,8 +82,8 @@ error: aborting due to previous error
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
```rust,noplayground
pub struct ThreadPool;
{{#rustdoc_include ../listings/ch20-web-server/no-listing-01-define-threadpool-struct/src/lib.rs}}
```
```
接着创建一个新目录,*src/bin*,并将二进制 crate 根文件从 *src/main.rs* 移动到 *src/bin/main.rs* 。这使得库 crate 成为 *hello* 目录的主要 crate; 不过仍然可以使用 `cargo run` 运行 *src/bin/main.rs* 二进制文件。移动了 *main.rs* 文件之后,修改 *src/bin/main.rs* 文件开头加入如下代码来引入库 crate 并将 `ThreadPool` 引入作用域:
接着创建一个新目录,*src/bin*,并将二进制 crate 根文件从 *src/main.rs* 移动到 *src/bin/main.rs* 。这使得库 crate 成为 *hello* 目录的主要 crate; 不过仍然可以使用 `cargo run` 运行 *src/bin/main.rs* 二进制文件。移动了 *main.rs* 文件之后,修改 *src/bin/main.rs* 文件开头加入如下代码来引入库 crate 并将 `ThreadPool` 引入作用域:
@ -164,58 +91,29 @@ pub struct ThreadPool;
< span class = "filename" > 文件名: src/bin/main.rs< / span >
< span class = "filename" > 文件名: src/bin/main.rs< / span >
```rust,ignore
```rust,ignore
use hello::ThreadPool;
{{#rustdoc_include ../listings/ch20-web-server/no-listing-01-define-threadpool-struct/src/bin/main.rs:here}}
```
```
这仍然不能工作,再次尝试运行来得到下一个需要解决的错误:
这仍然不能工作,再次尝试运行来得到下一个需要解决的错误:
```text
```console
$ cargo check
{{#include ../listings/ch20-web-server/no-listing-01-define-threadpool-struct/output.txt}}
Compiling hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for type
`hello::ThreadPool` in the current scope
--> src/bin/main.rs:13:16
|
13 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^^^^^^ function or associated item not found in
`hello::ThreadPool`
```
```
这告诉我们下一步是为 `ThreadPool` 创建一个叫做 `new` 的关联函数。我们还知道 `new` 需要有一个参数可以接受 `4` ,而且 `new` 应该返回 `ThreadPool` 实例。让我们实现拥有此特征的最小化 `new` 函数:
这告诉我们下一步是为 `ThreadPool` 创建一个叫做 `new` 的关联函数。我们还知道 `new` 需要有一个参数可以接受 `4` ,而且 `new` 应该返回 `ThreadPool` 实例。让我们实现拥有此特征的最小化 `new` 函数:
< span class = "filename" > 文件夹: src/lib.rs< / span >
< span class = "filename" > 文件夹: src/lib.rs< / span >
```rust
```rust,noplayground
pub struct ThreadPool;
{{#rustdoc_include ../listings/ch20-web-server/no-listing-02-impl-threadpool-new/src/lib.rs}}
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
```
```
这里选择 `usize` 作为 `size` 参数的类型,因为我们知道为负的线程数没有意义。我们还知道将使用 4 作为线程集合的元素数量,这也就是使用 `usize` 类型的原因,如第三章 [“整数类 型”][integer-types] 部分所讲。
这里选择 `usize` 作为 `size` 参数的类型,因为我们知道为负的线程数没有意义。我们还知道将使用 4 作为线程集合的元素数量,这也就是使用 `usize` 类型的原因,如第三章 [“整型”][integer-types] 部分所讲。
再次编译检查这段代码:
再次编译检查这段代码:
```text
```console
$ cargo check
{{#include ../listings/ch20-web-server/no-listing-02-impl-threadpool-new/output.txt}}
Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
--> src/lib.rs:4:16
|
4 | pub fn new(size: usize) -> ThreadPool {
| ^^^^
|
= note: #[warn(unused_variables)] on by default
= note: to avoid this warning, consider using `_size` instead
error[E0599]: no method named `execute` found for type `hello::ThreadPool` in the current scope
--> src/bin/main.rs:18:14
|
18 | pool.execute(|| {
| ^^^^^^^
```
```
现在有了一个警告和一个错误。暂时先忽略警告,发生错误是因为并没有 `ThreadPool` 上的 `execute` 方法。回忆 [“为有限数量的线程创建一个类似的接口” ](#creating-a-similar-interface-for-a-finite-number-of-threads ) 部分我们决定线程池应该有与 `thread::spawn` 类似的接口,同时我们将实现 `execute` 函数来获取传递的闭包并将其传递给池中的空闲线程执行。
现在有了一个警告和一个错误。暂时先忽略警告,发生错误是因为并没有 `ThreadPool` 上的 `execute` 方法。回忆 [“为有限数量的线程创建一个类似的接口” ](#creating-a-similar-interface-for-a-finite-number-of-threads ) 部分我们决定线程池应该有与 `thread::spawn` 类似的接口,同时我们将实现 `execute` 函数来获取传递的闭包并将其传递给池中的空闲线程执行。
@ -225,8 +123,9 @@ error[E0599]: no method named `execute` found for type `hello::ThreadPool` in th
```rust,ignore
```rust,ignore
pub fn spawn< F , T > (f: F) -> JoinHandle< T >
pub fn spawn< F , T > (f: F) -> JoinHandle< T >
where
where
F: FnOnce() -> T + Send + 'static,
F: FnOnce() -> T,
T: Send + 'static
F: Send + 'static,
T: Send + 'static,
```
```
`F` 是这里我们关心的参数;`T` 与返回值有关所以我们并不关心。考虑到 `spawn` 使用 `FnOnce` 作为 `F` 的 trait bound, 这可能也是我们需要的, 因为最终会将传递给 `execute` 的参数传给 `spawn` 。因为处理请求的线程只会执行闭包一次,这也进一步确认了 `FnOnce` 是我们需要的 trait, 这里符合 `FnOnce` 中 `Once` 的意思。
`F` 是这里我们关心的参数;`T` 与返回值有关所以我们并不关心。考虑到 `spawn` 使用 `FnOnce` 作为 `F` 的 trait bound, 这可能也是我们需要的, 因为最终会将传递给 `execute` 的参数传给 `spawn` 。因为处理请求的线程只会执行闭包一次,这也进一步确认了 `FnOnce` 是我们需要的 trait, 这里符合 `FnOnce` 中 `Once` 的意思。
@ -235,43 +134,16 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
```rust,noplayground
# pub struct ThreadPool;
{{#rustdoc_include ../listings/ch20-web-server/no-listing-03-define-execute/src/lib.rs:here}}
impl ThreadPool {
// --snip--
pub fn execute< F > (& self, f: F)
where
F: FnOnce() + Send + 'static
{
}
}
```
```
`FnOnce` trait 仍然需要之后的 `()` ,因为这里的 `FnOnce` 代表一个没有参数也没有返回值的闭包。正如函数的定义,返回值类型可以从签名中省略,不过即便没有参数也需要括号。
`FnOnce` trait 仍然需要之后的 `()` ,因为这里的 `FnOnce` 代表一个没有参数也没有返回值的闭包。正如函数的定义,返回值类型可以从签名中省略,不过即便没有参数也需要括号。
这里再一次增加了 `execute` 方法的最小化实现:它没有做任何工作,只是尝试让代码能够编译。再次进行检查:
这里再一次增加了 `execute` 方法的最小化实现:它没有做任何工作,只是尝试让代码能够编译。再次进行检查:
```text
```console
$ cargo check
{{#include ../listings/ch20-web-server/no-listing-03-define-execute/output.txt}}
Compiling hello v0.1.0 (file:///projects/hello)
warning: unused variable: `size`
--> src/lib.rs:4:16
|
4 | pub fn new(size: usize) -> ThreadPool {
| ^^^^
|
= note: #[warn(unused_variables)] on by default
= note: to avoid this warning, consider using `_size` instead
warning: unused variable: `f`
--> src/lib.rs:8:30
|
8 | pub fn execute< F > (& self, f: F)
| ^
|
= note: to avoid this warning, consider using `_f` instead
```
```
现在就只有警告了!这意味着能够编译了!注意如果尝试 `cargo run` 运行程序并在浏览器中发起请求,仍会在浏览器中出现在本章开始时那样的错误。这个库实际上还没有调用传递给 `execute` 的闭包!
现在就只有警告了!这意味着能够编译了!注意如果尝试 `cargo run` 运行程序并在浏览器中发起请求,仍会在浏览器中出现在本章开始时那样的错误。这个库实际上还没有调用传递给 `execute` 的闭包!
@ -284,24 +156,8 @@ warning: unused variable: `f`
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
```rust,noplayground
# pub struct ThreadPool;
{{#rustdoc_include ../listings/ch20-web-server/listing-20-13/src/lib.rs:here}}
impl ThreadPool {
/// 创建线程池。
///
/// 线程池中线程的数量。
///
/// # Panics
///
/// `new` 函数在 size 为 0 时会 panic。
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
}
```
```
< span class = "caption" > 示例 20-13: 实现 `ThreadPool::new` 在 `size` 为零时 panic</ span >
< span class = "caption" > 示例 20-13: 实现 `ThreadPool::new` 在 `size` 为零时 panic</ span >
@ -321,8 +177,9 @@ pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
```rust,ignore
```rust,ignore
pub fn spawn< F , T > (f: F) -> JoinHandle< T >
pub fn spawn< F , T > (f: F) -> JoinHandle< T >
where
where
F: FnOnce() -> T + Send + 'static,
F: FnOnce() -> T,
T: Send + 'static
F: Send + 'static,
T: Send + 'static,
```
```
`spawn` 返回 `JoinHandle<T>` ,其中 `T` 是闭包返回的类型。尝试使用 `JoinHandle` 来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 `T` 将会是单元类型 `()` 。
`spawn` 返回 `JoinHandle<T>` ,其中 `T` 是闭包返回的类型。尝试使用 `JoinHandle` 来看看会发生什么。在我们的情况中,传递给线程池的闭包会处理连接并不返回任何值,所以 `T` 将会是单元类型 `()` 。
@ -332,30 +189,7 @@ pub fn spawn<F, T>(f: F) -> JoinHandle<T>
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore,not_desired_behavior
```rust,ignore,not_desired_behavior
use std::thread;
{{#rustdoc_include ../listings/ch20-web-server/listing-20-14/src/lib.rs:here}}
pub struct ThreadPool {
threads: Vec< thread::JoinHandle < ( ) > >,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool {
threads
}
}
// --snip--
}
```
```
< span class = "caption" > 示例 20-14: 为 `ThreadPool` 创建一个 vector 来存放线程</ span >
< span class = "caption" > 示例 20-14: 为 `ThreadPool` 创建一个 vector 来存放线程</ span >
@ -387,46 +221,8 @@ impl ThreadPool {
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
```rust,noplayground
use std::thread;
{{#rustdoc_include ../listings/ch20-web-server/listing-20-15/src/lib.rs:here}}
pub struct ThreadPool {
workers: Vec< Worker > ,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers
}
}
// --snip--
}
struct Worker {
id: usize,
thread: thread::JoinHandle< ()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker {
id,
thread,
}
}
}
```
```
< span class = "caption" > 示例 20-15: 修改 `ThreadPool` 存放 `Worker` 实例而不是直接存放线程</ span >
< span class = "caption" > 示例 20-15: 修改 `ThreadPool` 存放 `Worker` 实例而不是直接存放线程</ span >
@ -455,54 +251,8 @@ impl Worker {
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
```rust,noplayground
# use std::thread;
{{#rustdoc_include ../listings/ch20-web-server/listing-20-16/src/lib.rs:here}}
// --snip--
use std::sync::mpsc;
pub struct ThreadPool {
workers: Vec< Worker > ,
sender: mpsc::Sender< Job > ,
}
struct Job;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
#
# struct Worker {
# id: usize,
# thread: thread::JoinHandle< ()>,
# }
#
# impl Worker {
# fn new(id: usize) -> Worker {
# let thread = thread::spawn(|| {});
#
# Worker {
# id,
# thread,
# }
# }
# }
```
```
< span class = "caption" > 示例 20-16: 修改 `ThreadPool` 来储存一个发送 `Job` 实例的通道发送端</ span >
< span class = "caption" > 示例 20-16: 修改 `ThreadPool` 来储存一个发送 `Job` 实例的通道发送端</ span >
@ -514,41 +264,7 @@ impl ThreadPool {
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore,does_not_compile
```rust,ignore,does_not_compile
impl ThreadPool {
{{#rustdoc_include ../listings/ch20-web-server/listing-20-17/src/lib.rs:here}}
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver< Job > ) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker {
id,
thread,
}
}
}
```
```
< span class = "caption" > 示例 20-17: 将通道的接收端传递给 worker< / span >
< span class = "caption" > 示例 20-17: 将通道的接收端传递给 worker< / span >
@ -557,18 +273,8 @@ impl Worker {
如果尝试 check 代码,会得到这个错误:
如果尝试 check 代码,会得到这个错误:
```text
```console
$ cargo check
{{#include ../listings/ch20-web-server/listing-20-17/output.txt}}
Compiling hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:27:42
|
27 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here in
previous iteration of loop
|
= note: move occurs because `receiver` has type
`std::sync::mpsc::Receiver<Job>` , which does not implement the `Copy` trait
```
```
这段代码尝试将 `receiver` 传递给多个 `Worker` 实例。这是不行的, 回忆第十六章: Rust 所提供的通道实现是多 ** 生产者**,单 ** 消费者** 的。这意味着不能简单的克隆通道的消费端来解决问题。即便可以,那也不是我们希望使用的技术;我们希望通过在所有的 worker 中共享单一 `receiver` ,在线程间分发任务。
这段代码尝试将 `receiver` 传递给多个 `Worker` 实例。这是不行的, 回忆第十六章: Rust 所提供的通道实现是多 ** 生产者**,单 ** 消费者** 的。这意味着不能简单的克隆通道的消费端来解决问题。即便可以,那也不是我们希望使用的技术;我们希望通过在所有的 worker 中共享单一 `receiver` ,在线程间分发任务。
@ -579,61 +285,8 @@ error[E0382]: use of moved value: `receiver`
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
```rust,noplayground
# use std::thread;
{{#rustdoc_include ../listings/ch20-web-server/listing-20-18/src/lib.rs:here}}
# use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
// --snip--
# pub struct ThreadPool {
# workers: Vec< Worker > ,
# sender: mpsc::Sender< Job > ,
# }
# struct Job;
#
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender,
}
}
// --snip--
}
# struct Worker {
# id: usize,
# thread: thread::JoinHandle< ()>,
# }
#
impl Worker {
fn new(id: usize, receiver: Arc< Mutex < mpsc::Receiver < Job > >>) -> Worker {
// --snip--
# let thread = thread::spawn(|| {
# receiver;
# });
#
# Worker {
# id,
# thread,
# }
}
}
```
```
< span class = "caption" > 示例 20-18: 使用 `Arc` 和 `Mutex` 在 worker 间共享通道的接收端</ span >
< span class = "caption" > 示例 20-18: 使用 `Arc` 和 `Mutex` 在 worker 间共享通道的接收端</ span >
@ -648,31 +301,8 @@ impl Worker {
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust
```rust,noplayground
// --snip--
{{#rustdoc_include ../listings/ch20-web-server/listing-20-19/src/lib.rs:here}}
# pub struct ThreadPool {
# workers: Vec< Worker > ,
# sender: mpsc::Sender< Job > ,
# }
# use std::sync::mpsc;
# struct Worker {}
type Job = Box< dyn FnOnce ( ) + Send + ' static > ;
impl ThreadPool {
// --snip--
pub fn execute< F > (& self, f: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
```
```
< span class = "caption" > 示例 20-19: 为存放每一个闭包的 `Box` 创建一个 `Job` 类型别名,接着在通道中发出任务</ span >
< span class = "caption" > 示例 20-19: 为存放每一个闭包的 `Box` 创建一个 `Job` 类型别名,接着在通道中发出任务</ span >
@ -683,27 +313,8 @@ impl ThreadPool {
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore,does_not_compile
```rust,noplayground
// --snip--
{{#rustdoc_include ../listings/ch20-web-server/listing-20-20/src/lib.rs:here}}
impl Worker {
fn new(id: usize, receiver: Arc< Mutex < mpsc::Receiver < Job > >>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
}
});
Worker {
id,
thread,
}
}
}
```
```
< span class = "caption" > 示例 20-20: 在 worker 线程中接收并执行任务< / span >
< span class = "caption" > 示例 20-20: 在 worker 线程中接收并执行任务< / span >
@ -714,37 +325,35 @@ impl Worker {
调用 `recv` 会阻塞当前线程,所以如果还没有任务,其会等待直到有可用的任务。`Mutex< T > ` 确保一次只有一个 `Worker` 线程尝试请求任务。
调用 `recv` 会阻塞当前线程,所以如果还没有任务,其会等待直到有可用的任务。`Mutex< T > ` 确保一次只有一个 `Worker` 线程尝试请求任务。
通过这个技巧, 线程池处于可以运行的状态了!执行 `cargo run` 并发起一些请求:
现在 线程池处于可以运行的状态了!执行 `cargo run` 并发起一些请求:
```text
```console
$ cargo run
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never use d: `workers`
warning: field is never rea d: `workers`
--> src/lib.rs:7:5
--> src/lib.rs:7:5
|
|
7 | workers: Vec< Worker > ,
7 | workers: Vec< Worker > ,
| ^^^^^^^^^^^^^^^^^^^^
| ^^^^^^^^^^^^^^^^^^^^
|
|
= note: #[warn(dead_code)] on by default
= note: ` #[warn(dead_code)]` on by default
warning: field is never use d: `id`
warning: field is never rea d: `id`
--> src/lib.rs:61 :5
--> src/lib.rs:48 :5
|
|
61 | id: usize,
48 | id: usize,
| ^^^^^^^^^
| ^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
warning: field is never use d: `thread`
warning: field is never read: `thread`
--> src/lib.rs:62 :5
--> src/lib.rs:49 :5
|
|
62 | thread: thread::JoinHandle< ()>,
49 | thread: thread::JoinHandle< ()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
Finished dev [unoptimized + debuginfo] target(s) in 0.99 secs
warning: 3 warnings emitted
Running `target/debug/hello`
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/main`
Worker 0 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 1 got a job; executing.
@ -766,34 +375,17 @@ Worker 2 got a job; executing.
< span class = "filename" > 文件名: src/lib.rs< / span >
< span class = "filename" > 文件名: src/lib.rs< / span >
```rust,ignore,not_desired_behavior
```rust,ignore,not_desired_behavior
// --snip--
{{#rustdoc_include ../listings/ch20-web-server/listing-20-21/src/lib.rs:here}}
impl Worker {
fn new(id: usize, receiver: Arc< Mutex < mpsc::Receiver < Job > >>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {} got a job; executing.", id);
job();
}
});
Worker {
id,
thread,
}
}
}
```
```
< span class = "caption" > 示例 20-21: 一个使用 `while let` 的 `Worker::new` 替代实现</ span >
< span class = "caption" > 示例 20-21: 一个使用 `while let` 的 `Worker::new` 替代实现</ span >
这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。其原因有些微妙:`Mutex` 结构体没有公有 `unlock` 方法,因为锁的所有权依赖 `lock` 方法返回的 `LockResult<MutexGuard<T>>` 中 `MutexGuard<T>` 的生命周期。这允许借用检查器在编译时确保绝不会在没有持有锁的情况下访问由 `Mutex` 守护的资源,不过如果没有认真的思考 `MutexGuard<T>` 的生命周期的话,也可能会导致比预期更久的持有锁。因为 `while` 表达式中的值在整个块一直处于作用域中,`job()` 调用的过程中其仍然持有锁,这意味着其他 worker 不能接收任务。
这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。其原因有些微妙:`Mutex` 结构体没有公有 `unlock` 方法,因为锁的所有权依赖 `lock` 方法返回的 `LockResult<MutexGuard<T>>` 中 `MutexGuard<T>` 的生命周期。这允许借用检查器在编译时确保绝不会在没有持有锁的情况下访问由 `Mutex` 守护的资源,不过如果没有认真的思考 `MutexGuard<T>` 的生命周期的话,也可能会导致比预期更久的持有锁。
相反通过使用 `loop` 并在循环块之内而不是之外获取锁和任务,`lock` 方法返回的 `MutexGuard` 在 `let job` 语句结束之后立刻就被丢弃了。这确保了 `recv` 调用过程中持有锁,而在 `job()` 调用前锁就被释放了,这就允许并发处理多个请求了 。
示例 20-20 中的代码使用的 `let job = receiver.lock().unwrap().recv().unwrap();` 之所以可以工作是因为对于 `let` 来说,当 `let` 语句结束时任何表达式中等号右侧使用的临时值都会立即被丢弃。然而 `while let` ( `if let` 和 `match` )直到相关的代码块结束都不会丢弃临时值。在示例 20-21 中,`job()` 调用期间锁一直持续,这也意味着其他的 worker 无法接受任务。
[creating-type-synonyms-with-type-aliases]:
[creating-type-synonyms-with-type-aliases]:
ch19-04-advanced-types.html#creating-type-synonyms-with-type-aliases
ch19-04-advanced-types.html#类型别名用来创建类型同义词
[integer-types]: ch03-02-data-types.html#integer-types
[integer-types]: ch03-02-data-types.html#整型
[storing-closures-using-generic-parameters-and-the-fn-traits]:
[storing-closures-using-generic-parameters-and-the-fn-traits]:
ch13-01-closures.html#storing-closures-using-generic-parameters-and-the-fn-traits
ch13-01-closures.html#使用带有泛型和-fn-trait-的闭包