diff --git a/book/contents/SUMMARY.md b/book/contents/SUMMARY.md index 87e0c857..7c6f1618 100644 --- a/book/contents/SUMMARY.md +++ b/book/contents/SUMMARY.md @@ -76,7 +76,7 @@ - [多线程并发编程 doing](advance/concurrency-with-threads/intro.md) - [并发和并行](advance/concurrency-with-threads/concurrency-parallelism.md) - [使用多线程](advance/concurrency-with-threads/thread.md) - - [消息与锁](advance/concurrency-with-threads/message-passing.md) + - [线程间的消息传递](advance/concurrency-with-threads/message-passing.md) - [Send、Sync todo](advance/multi-threads/send-sync.md) - [一个综合例子](advance/multi-threads/example.md) - [async/await并发编程 todo](advance/async/intro.md) diff --git a/book/contents/advance/concurrency-with-threads/message-passing.md b/book/contents/advance/concurrency-with-threads/message-passing.md index 61a1c2c2..1999b12a 100644 --- a/book/contents/advance/concurrency-with-threads/message-passing.md +++ b/book/contents/advance/concurrency-with-threads/message-passing.md @@ -1,17 +1,17 @@ -# 消息与锁 +# 线程间的消息传递 在多线程间有多种方式可以共享、传递数据,最常用的方式就是通过消息传递或者将锁和`Arc`联合使用,而对于前者,在编程界还有一个大名鼎鼎的`Actor线程模型`为其背书,典型的有Erlang语言,还有Go语言中很经典的一句话: > Do not communicate by sharing memory; instead, share memory by communicating -而对于后者,则是老生常谈的解决方法:通过锁来实现在某一个时间点,只有一个线程能访问对应的资源,其它线程则需要等待该线程使用完后,才能申请使用。 +而对于后者,则是老生常谈的解决方法:通过锁来实现在某一个时间点,只有一个线程能访问对应的资源,其它线程则需要等待该线程使用完后,才能申请使用, 具体内容我们将在下一节中讲述。 ## 消息通道 与Go语言内置的`chan`不同,Rust是在标准库里提供了消息通道(`channel`),你可以将其想象成一场直播,多个主播联合起来在搞一场直播,最终内容通过通道传输给屏幕前的我们,其中主播被称之为**发送者**,观众被称之为**接收者**,显而易见的是:一个通道应该支持多个发送者和接收者。 但是,在实际使用中,我们需要使用不同的库来满足诸如:**多发送者 -> 单接收者,多发送者 -> 多接收者**等场景形式,此时一个标准库显然就不够了,不过别急,让我们先从标准库讲起。 -### 多发送者,单接收者 +## 多发送者,单接收者 标准库提供了通道`std::sync::mpsc`,其中`mpsc`是*multiple producer, single consumer*的缩写,代表了该通道支持多个发送者,但是只支持唯一的接收者。 当然,支持多个发送者也意味着支持单个发送者,我们先来看看单发送者、单接收者的简单例子: ```rust use std::sync::mpsc; @@ -45,7 +45,7 @@ fn main() { 同样的,对于`recv`方法来说,当发送者关闭时,它也会接收到一个错误,用于说明不会再有任何值被发送过来。 -#### 不阻塞的try_recv方法 +### 不阻塞的try_recv方法 除了上述`recv`方法,还可以使用`try_recv`尝试接收一次消息,该方法并**不会阻塞线程**,当通道中没有消息时,它会立刻返回一个错误: ```rust use std::sync::mpsc; @@ -78,7 +78,7 @@ receive Err(Disconnected) 如上,当子线程创建成功且发送消息后,主线程会接收到`Ok(1)`的消息内容,紧接着子线程结束,发送者也随着被`drop`,此时接收者又会报错,但是这次错误原因有所不同:`Disconnected`代表发送者已经被关闭。 -#### 传输具有所有权的数据 +### 传输具有所有权的数据 使用通道来传输数据,一样要遵循Rust的所有权规则: - 若值的类型实现了`Copy`特征,则直接复制一份该值,然后传输过去,例如之前的`i32`类型 @@ -118,7 +118,7 @@ error[E0382]: borrow of moved value: `s` 各种细节不禁令人感叹:Rust还是安全!假如没有所有权的保护,`String`字符串将被两个线程同时持有,任何一个线程对字符串内容的修改都会导致另外一个线程持有的字符串被改变,除非你故意这么设计,否则这就是不安全的隐患。 -#### 使用for进行循环接收 +### 使用for进行循环接收 下面来看看如何连续接收通道中的值: ```rust use std::sync::mpsc; @@ -150,7 +150,7 @@ fn main() { 在上面代码中,主线程和子线程是并发运行的,子线程在不停的**发送消息 -> 休眠1秒**,与此同时,主线程使用`for`循环**阻塞**的从`rx`**迭代器**中接收消息,当子线程运行完成时,发送者`tx`会随之被`drop`,此时`for`循环将被终止,最终`main`线程成功结束。 -#### 使用多发送者 +### 使用多发送者 由于子线程会拿走发送者的所有权,因此我们必须对发送者进行克隆,然后让每个线程拿走它的一份拷贝: ```rust use std::sync::mpsc; @@ -181,7 +181,135 @@ fn main() { - 这里虽然用了`clone`但是并不会影响性能,因为它并不在热点代码路径中,仅仅会被执行一次 - 由于两个子线程谁创建完成是未知的,因此哪条消息先发送也是未知的,最终主线程的输出顺序也不确定 -注意,上述第三点的消息顺序仅仅是因为线程创建引起的,并不代表通道中的线程是无序的,对于通道而言,先发送就先被接收,典型的`FIFO`(first in, first out)。 +### 消息顺序 +上述第三点的消息顺序仅仅是因为线程创建引起的,并不代表通道中的线程是无序的,对于通道而言,消息的发送顺序和接收顺序是一直的,满足`FIFO`原则(先进先出)。 -#### 消息顺序 -### 关闭通道 \ No newline at end of file +由于篇幅有限,具体的代码这里就不再给出,感兴趣的读者可以自己验证下。 + + +### 同步和异步通道 +Rust标准库的`mpsc`通道其实分为两种类型:同步和异步。 + +##### 异步通道 +之前我们使用的都是异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞: +```rust +use std::sync::mpsc; +use std::thread; +use std::time::Duration; +fn main() { + let (tx, rx)= mpsc::channel(); + + let handle = thread::spawn(move || { + println!("发送之前"); + tx.send(1).unwrap(); + println!("发送之后"); + }); + + println!("睡眠之前"); + thread::sleep(Duration::from_secs(3)); + println!("睡眠之后"); + + println!("收到值 {}", rx.recv().unwrap()); + handle.join().unwrap(); +} +``` + +运行后输出如下: +```console +睡眠之前 +发送之前 +发送之后 +//···睡眠3秒 +睡眠之后 +收到值 1 +``` + +主线程因为睡眠阻塞了3秒,因此并没有进行消息接收,而子线程却在此期间轻松完成了消息的发送。等主线程睡眠结束后,才姗姗来迟的从通道中接收了子线程老早之前发送的消息。 + +从输出还可以看出,`发送之前`和`发送之后`是连续输出的,没有受到接收端主线程的任何影响,因此通过`mpsc::channel`创建的通道是异步通道。 + +##### 同步通道 +与异步通道相反,同步通道**发送消息是阻塞的,只有在消息被接收后才解除阻塞**例如: +```rust +use std::sync::mpsc; +use std::thread; +use std::time::Duration; +fn main() { + let (tx, rx)= mpsc::sync_channel(0); + + let handle = thread::spawn(move || { + println!("发送之前"); + tx.send(1).unwrap(); + println!("发送之后"); + }); + + println!("睡眠之前"); + thread::sleep(Duration::from_secs(3)); + println!("睡眠之后"); + + println!("receive {}", rx.recv().unwrap()); + handle.join().unwrap(); +} +``` + +运行后输出如下: +```console +睡眠之前 +发送之前 +//···睡眠3秒 +睡眠之后 +收到值 1 +发送之后 +``` + +可以看出,主线程由于睡眠被阻塞导致无法接收消息,因此子线程的发送也一直被阻塞,直到主线程结束睡眠并成功接收消息后,发送才成功:**发送之后**的输出是在**收到值 1**之后,说明**只有接收消息彻底成功后,发送消息才算完成**。 + + +##### 消息缓存 +细心的读者可能已经发现在创建同步通道时,我们传递了一个参数`0`: `mpsc::sync_channel(0);`,这是什么意思呢? + +答案不急给出,先将`0`改成`1`,然后再运行试试: +```console +睡眠之前 +发送之前 +发送之后 +睡眠之后 +receive 1 +``` + +纳尼。。竟然得到了和异步通道一样的效果:根本没有等待主线程的接收开始,消息发送就立即完成了! 难道同步通道变成了异步通道? 别急,将子线程中的代码修改下试试: +```rust +println!("首次发送之前"); +tx.send(1).unwrap(); +println!("首次发送之后"); +tx.send(1).unwrap(); +println!("再次发送之后"); +``` + +在子线程中,我们又多发了一条消息,此时输出如下: +```console +睡眠之前 +首次发送之前 +首次发送之后 +//···睡眠3秒 +睡眠之后 +receive 1 +再次发送之后 +``` + +Bingo,更奇怪的事出现了,第一条消息瞬间发送完成,没有阻塞,而发送第二条消息时却符合同步通道的特点:阻塞了,直到主线程接收后,才发送完成。 + +其实,一切的关键就在于`1`上,该值可以用来指定同步通道的消息缓存条数,当你设定为`N`时,发送者就可以无阻塞的往通道中发送`N`条消息,当消息缓冲队列满了后,新的消息发送将被阻塞(如果没有接收者消费缓冲队列中的消息,那么第`N+1`条消息就将触发发送阻塞)。 + +问题又来了,异步通道创建时完全没有这个缓冲值参数`mpsc::channel()`,它的缓冲值怎么设置呢? 额。。。都异步了,都可以无限发送了,都有摩托车了,还要自行车做啥子哦?事实上异步通道的缓冲上限取决于你的内存大小,不要撑爆就行。 + +因此,使用异步消息虽然能非常高效且不会造成发送线程的阻塞,但是存在消息未及时消费,最终内存过大的问题。在实际项目中,可以考虑使用一个带缓冲值的同步通道来避免这种风险。 + + + +### 关闭通道 +之前我们数次提到了通道关闭,并且提到了当通道关闭后,发送消息或接收消息将会报错。那么如何关闭通道呢? 很简单:**所有发送者被`drop`或者所有接收者被`drop`后,通道会自动关闭**。 + +神奇的是,这件事是在编译期实现的,完全没有运行期性能损耗!只能说Rust的`Drop`特征YYDS! + +## 多发送者,多消费者 \ No newline at end of file