|
|
@ -1,10 +1,9 @@
|
|
|
|
# 线程间的消息传递
|
|
|
|
# 线程间的消息传递
|
|
|
|
|
|
|
|
|
|
|
|
在多线程间有多种方式可以共享、传递数据,最常用的方式就是通过消息传递或者将锁和`Arc`联合使用,而对于前者,在编程界还有一个大名鼎鼎的`Actor线程模型`为其背书,典型的有Erlang语言,还有Go语言中很经典的一句话:
|
|
|
|
在多线程间有多种方式可以共享、传递数据,最常用的方式就是通过消息传递或者将锁和`Arc`联合使用,而对于前者,在编程界还有一个大名鼎鼎的`Actor线程模型`为其背书,典型的有Erlang语言,还有Go语言中很经典的一句话:
|
|
|
|
|
|
|
|
|
|
|
|
> Do not communicate by sharing memory; instead, share memory by communicating
|
|
|
|
> Do not communicate by sharing memory; instead, share memory by communicating
|
|
|
|
|
|
|
|
|
|
|
|
而对于后者,则是老生常谈的解决方法:通过锁来实现在某一个时间点,只有一个线程能访问对应的资源,其它线程则需要等待该线程使用完后,才能申请使用, 具体内容我们将在下一节中讲述。
|
|
|
|
而对于后者,我们将在下一节中进行讲述。
|
|
|
|
|
|
|
|
|
|
|
|
## 消息通道
|
|
|
|
## 消息通道
|
|
|
|
与Go语言内置的`chan`不同,Rust是在标准库里提供了消息通道(`channel`),你可以将其想象成一场直播,多个主播联合起来在搞一场直播,最终内容通过通道传输给屏幕前的我们,其中主播被称之为**发送者**,观众被称之为**接收者**,显而易见的是:一个通道应该支持多个发送者和接收者。
|
|
|
|
与Go语言内置的`chan`不同,Rust是在标准库里提供了消息通道(`channel`),你可以将其想象成一场直播,多个主播联合起来在搞一场直播,最终内容通过通道传输给屏幕前的我们,其中主播被称之为**发送者**,观众被称之为**接收者**,显而易见的是:一个通道应该支持多个发送者和接收者。
|
|
|
@ -45,7 +44,7 @@ fn main() {
|
|
|
|
|
|
|
|
|
|
|
|
同样的,对于`recv`方法来说,当发送者关闭时,它也会接收到一个错误,用于说明不会再有任何值被发送过来。
|
|
|
|
同样的,对于`recv`方法来说,当发送者关闭时,它也会接收到一个错误,用于说明不会再有任何值被发送过来。
|
|
|
|
|
|
|
|
|
|
|
|
### 不阻塞的try_recv方法
|
|
|
|
## 不阻塞的try_recv方法
|
|
|
|
除了上述`recv`方法,还可以使用`try_recv`尝试接收一次消息,该方法并**不会阻塞线程**,当通道中没有消息时,它会立刻返回一个错误:
|
|
|
|
除了上述`recv`方法,还可以使用`try_recv`尝试接收一次消息,该方法并**不会阻塞线程**,当通道中没有消息时,它会立刻返回一个错误:
|
|
|
|
```rust
|
|
|
|
```rust
|
|
|
|
use std::sync::mpsc;
|
|
|
|
use std::sync::mpsc;
|
|
|
@ -78,7 +77,7 @@ receive Err(Disconnected)
|
|
|
|
|
|
|
|
|
|
|
|
如上,当子线程创建成功且发送消息后,主线程会接收到`Ok(1)`的消息内容,紧接着子线程结束,发送者也随着被`drop`,此时接收者又会报错,但是这次错误原因有所不同:`Disconnected`代表发送者已经被关闭。
|
|
|
|
如上,当子线程创建成功且发送消息后,主线程会接收到`Ok(1)`的消息内容,紧接着子线程结束,发送者也随着被`drop`,此时接收者又会报错,但是这次错误原因有所不同:`Disconnected`代表发送者已经被关闭。
|
|
|
|
|
|
|
|
|
|
|
|
### 传输具有所有权的数据
|
|
|
|
## 传输具有所有权的数据
|
|
|
|
使用通道来传输数据,一样要遵循Rust的所有权规则:
|
|
|
|
使用通道来传输数据,一样要遵循Rust的所有权规则:
|
|
|
|
|
|
|
|
|
|
|
|
- 若值的类型实现了`Copy`特征,则直接复制一份该值,然后传输过去,例如之前的`i32`类型
|
|
|
|
- 若值的类型实现了`Copy`特征,则直接复制一份该值,然后传输过去,例如之前的`i32`类型
|
|
|
@ -118,7 +117,7 @@ error[E0382]: borrow of moved value: `s`
|
|
|
|
|
|
|
|
|
|
|
|
各种细节不禁令人感叹:Rust还是安全!假如没有所有权的保护,`String`字符串将被两个线程同时持有,任何一个线程对字符串内容的修改都会导致另外一个线程持有的字符串被改变,除非你故意这么设计,否则这就是不安全的隐患。
|
|
|
|
各种细节不禁令人感叹:Rust还是安全!假如没有所有权的保护,`String`字符串将被两个线程同时持有,任何一个线程对字符串内容的修改都会导致另外一个线程持有的字符串被改变,除非你故意这么设计,否则这就是不安全的隐患。
|
|
|
|
|
|
|
|
|
|
|
|
### 使用for进行循环接收
|
|
|
|
## 使用for进行循环接收
|
|
|
|
下面来看看如何连续接收通道中的值:
|
|
|
|
下面来看看如何连续接收通道中的值:
|
|
|
|
```rust
|
|
|
|
```rust
|
|
|
|
use std::sync::mpsc;
|
|
|
|
use std::sync::mpsc;
|
|
|
@ -181,16 +180,16 @@ fn main() {
|
|
|
|
- 这里虽然用了`clone`但是并不会影响性能,因为它并不在热点代码路径中,仅仅会被执行一次
|
|
|
|
- 这里虽然用了`clone`但是并不会影响性能,因为它并不在热点代码路径中,仅仅会被执行一次
|
|
|
|
- 由于两个子线程谁创建完成是未知的,因此哪条消息先发送也是未知的,最终主线程的输出顺序也不确定
|
|
|
|
- 由于两个子线程谁创建完成是未知的,因此哪条消息先发送也是未知的,最终主线程的输出顺序也不确定
|
|
|
|
|
|
|
|
|
|
|
|
### 消息顺序
|
|
|
|
## 消息顺序
|
|
|
|
上述第三点的消息顺序仅仅是因为线程创建引起的,并不代表通道中的线程是无序的,对于通道而言,消息的发送顺序和接收顺序是一直的,满足`FIFO`原则(先进先出)。
|
|
|
|
上述第三点的消息顺序仅仅是因为线程创建引起的,并不代表通道中的线程是无序的,对于通道而言,消息的发送顺序和接收顺序是一直的,满足`FIFO`原则(先进先出)。
|
|
|
|
|
|
|
|
|
|
|
|
由于篇幅有限,具体的代码这里就不再给出,感兴趣的读者可以自己验证下。
|
|
|
|
由于篇幅有限,具体的代码这里就不再给出,感兴趣的读者可以自己验证下。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 同步和异步通道
|
|
|
|
## 同步和异步通道
|
|
|
|
Rust标准库的`mpsc`通道其实分为两种类型:同步和异步。
|
|
|
|
Rust标准库的`mpsc`通道其实分为两种类型:同步和异步。
|
|
|
|
|
|
|
|
|
|
|
|
##### 异步通道
|
|
|
|
#### 异步通道
|
|
|
|
之前我们使用的都是异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞:
|
|
|
|
之前我们使用的都是异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞:
|
|
|
|
```rust
|
|
|
|
```rust
|
|
|
|
use std::sync::mpsc;
|
|
|
|
use std::sync::mpsc;
|
|
|
@ -228,7 +227,7 @@ fn main() {
|
|
|
|
|
|
|
|
|
|
|
|
从输出还可以看出,`发送之前`和`发送之后`是连续输出的,没有受到接收端主线程的任何影响,因此通过`mpsc::channel`创建的通道是异步通道。
|
|
|
|
从输出还可以看出,`发送之前`和`发送之后`是连续输出的,没有受到接收端主线程的任何影响,因此通过`mpsc::channel`创建的通道是异步通道。
|
|
|
|
|
|
|
|
|
|
|
|
##### 同步通道
|
|
|
|
#### 同步通道
|
|
|
|
与异步通道相反,同步通道**发送消息是阻塞的,只有在消息被接收后才解除阻塞**例如:
|
|
|
|
与异步通道相反,同步通道**发送消息是阻塞的,只有在消息被接收后才解除阻塞**例如:
|
|
|
|
```rust
|
|
|
|
```rust
|
|
|
|
use std::sync::mpsc;
|
|
|
|
use std::sync::mpsc;
|
|
|
@ -265,7 +264,7 @@ fn main() {
|
|
|
|
可以看出,主线程由于睡眠被阻塞导致无法接收消息,因此子线程的发送也一直被阻塞,直到主线程结束睡眠并成功接收消息后,发送才成功:**发送之后**的输出是在**收到值 1**之后,说明**只有接收消息彻底成功后,发送消息才算完成**。
|
|
|
|
可以看出,主线程由于睡眠被阻塞导致无法接收消息,因此子线程的发送也一直被阻塞,直到主线程结束睡眠并成功接收消息后,发送才成功:**发送之后**的输出是在**收到值 1**之后,说明**只有接收消息彻底成功后,发送消息才算完成**。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### 消息缓存
|
|
|
|
#### 消息缓存
|
|
|
|
细心的读者可能已经发现在创建同步通道时,我们传递了一个参数`0`: `mpsc::sync_channel(0);`,这是什么意思呢?
|
|
|
|
细心的读者可能已经发现在创建同步通道时,我们传递了一个参数`0`: `mpsc::sync_channel(0);`,这是什么意思呢?
|
|
|
|
|
|
|
|
|
|
|
|
答案不急给出,先将`0`改成`1`,然后再运行试试:
|
|
|
|
答案不急给出,先将`0`改成`1`,然后再运行试试:
|
|
|
@ -307,9 +306,72 @@ Bingo,更奇怪的事出现了,第一条消息瞬间发送完成,没有阻
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 关闭通道
|
|
|
|
## 关闭通道
|
|
|
|
之前我们数次提到了通道关闭,并且提到了当通道关闭后,发送消息或接收消息将会报错。那么如何关闭通道呢? 很简单:**所有发送者被`drop`或者所有接收者被`drop`后,通道会自动关闭**。
|
|
|
|
之前我们数次提到了通道关闭,并且提到了当通道关闭后,发送消息或接收消息将会报错。那么如何关闭通道呢? 很简单:**所有发送者被`drop`或者所有接收者被`drop`后,通道会自动关闭**。
|
|
|
|
|
|
|
|
|
|
|
|
神奇的是,这件事是在编译期实现的,完全没有运行期性能损耗!只能说Rust的`Drop`特征YYDS!
|
|
|
|
神奇的是,这件事是在编译期实现的,完全没有运行期性能损耗!只能说Rust的`Drop`特征YYDS!
|
|
|
|
|
|
|
|
|
|
|
|
## 多发送者,多消费者
|
|
|
|
## 传输多种类型的数据
|
|
|
|
|
|
|
|
之前提到过,一个消息通道只能传输一种类型的数据,如果你想要传输多种类型的数据,可以为每个类型创建一个通道,你也可以使用枚举类型来实现:
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
|
|
|
se std::sync::mpsc::{self, Receiver, Sender};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
enum Fruit {
|
|
|
|
|
|
|
|
Apple(u8),
|
|
|
|
|
|
|
|
Orange(String)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
|
|
|
let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tx.send(Fruit::Orange("sweet".to_string())).unwrap();
|
|
|
|
|
|
|
|
tx.send(Fruit::Apple(2)).unwrap();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for _ in 0..2 {
|
|
|
|
|
|
|
|
match rx.recv().unwrap() {
|
|
|
|
|
|
|
|
Fruit::Apple(count) => println!("received {} apples", count),
|
|
|
|
|
|
|
|
Fruit::Orange(flavor) => println!("received {} oranges", flavor),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
如上所示,枚举类型还能让我们带上想要传输的数据,但是有一点需要注意,Rust会按照枚举中占用内存最大的那个成员进行内存对齐,这意味着就算你传输的是枚举中占用内存最小的成员,它占用的内存依然和最大的成员相同, 因此会造成内存上的浪费。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## 新手容易遇到的坑
|
|
|
|
|
|
|
|
`mpsc`虽然相当简洁明了,但是在使用起来还是可能存在坑:
|
|
|
|
|
|
|
|
```rust
|
|
|
|
|
|
|
|
use std::sync::mpsc;
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let (send, recv) = mpsc::channel();
|
|
|
|
|
|
|
|
let num_threads = 3;
|
|
|
|
|
|
|
|
for i in 0..num_threads {
|
|
|
|
|
|
|
|
let thread_send = send.clone();
|
|
|
|
|
|
|
|
thread::spawn(move || {
|
|
|
|
|
|
|
|
thread_send.send(i).unwrap();
|
|
|
|
|
|
|
|
println!("thread {:?} finished", i);
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 在这里drop send...
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for x in recv {
|
|
|
|
|
|
|
|
println!("Got: {}", x);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
println!("finished iterating");
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
以上代码看起来非常正常,但是运行后主线程会一直阻塞,最后一行打印输出也不会被执行,原因在于: 子线程拿走的是复制后的`send`的所有权,这些拷贝会在子线程结束后被`drop`,因此无需担心,但是`send`本身却直到`main`函数的结束才会被`drop`。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
之前提到,通道关闭的两个条件:发送者全部`drop`或接收者被`drop`,要结束`for`循环显然是要求发送者全部`drop`,但是由于`send`自身没有被`drop`,会导致该循环永远无法结束,最终主线程会一直阻塞。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
解决办法很简单,`drop`掉`send`即可:在代码中的注释下面添加一行`drop(send);`。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## mpmc、更好的性能
|
|
|
|
|
|
|
|
如果你需要mpmc(多发送者,多接收者)或者需要更高的性能,可以考虑第三方库:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- [**crossbeam-channel**](https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel), 老牌强库,功能较全,性能较强,之前是独立的库,但是后面合并到了`crossbeam`主仓库中
|
|
|
|
|
|
|
|
- [**flume**](https://github.com/zesterer/flume), 官方给出的性能数据要比crossbeam更好些,但是貌似最近没怎么更新
|
|
|
|