|
|
|
@ -78,6 +78,110 @@ receive Err(Disconnected)
|
|
|
|
|
|
|
|
|
|
如上,当子线程创建成功且发送消息后,主线程会接收到`Ok(1)`的消息内容,紧接着子线程结束,发送者也随着被`drop`,此时接收者又会报错,但是这次错误原因有所不同:`Disconnected`代表发送者已经被关闭。
|
|
|
|
|
|
|
|
|
|
#### 传输具有所有权的数据
|
|
|
|
|
使用通道来传输数据,一样要遵循Rust的所有权规则:
|
|
|
|
|
|
|
|
|
|
- 若值的类型实现了`Copy`特征,则直接复制一份该值,然后传输过去,例如之前的`i32`类型
|
|
|
|
|
- 若值没有实现`Copy`,则它的所有权会被转移给接收端,在发送端继续使用该值将报错
|
|
|
|
|
|
|
|
|
|
#### 关闭通道
|
|
|
|
|
一起来看看第二种情况:
|
|
|
|
|
```rust
|
|
|
|
|
use std::sync::mpsc;
|
|
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
|
|
|
|
|
|
thread::spawn(move || {
|
|
|
|
|
let s = String::from("我,飞走咯!");
|
|
|
|
|
tx.send(s).unwrap();
|
|
|
|
|
println!("val is {}", s);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let received = rx.recv().unwrap();
|
|
|
|
|
println!("Got: {}", received);
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
以上代码中,`String`底层的字符串是存储在堆上,被没有实现`Copy`特征,当它被发送后,会将所有权从发送端的`s`转移给接收端的`received`,之后`s`将无法被使用:
|
|
|
|
|
```console
|
|
|
|
|
error[E0382]: borrow of moved value: `s`
|
|
|
|
|
--> src/main.rs:10:31
|
|
|
|
|
|
|
|
|
|
|
8 | let s = String::from("我,飞走咯!");
|
|
|
|
|
| - move occurs because `s` has type `String`, which does not implement the `Copy` trait // 所有权被转移,由于`String`没有实现`Copy`特征
|
|
|
|
|
9 | tx.send(s).unwrap();
|
|
|
|
|
| - value moved here // 所有权被转移走
|
|
|
|
|
10 | println!("val is {}", s);
|
|
|
|
|
| ^ value borrowed here after move // 所有权被转移后,依然对s进行了借用
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
各种细节不禁令人感叹:Rust还是安全!假如没有所有权的保护,`String`字符串将被两个线程同时持有,任何一个线程对字符串内容的修改都会导致另外一个线程持有的字符串被改变,除非你故意这么设计,否则这就是不安全的隐患。
|
|
|
|
|
|
|
|
|
|
#### 使用for进行循环接收
|
|
|
|
|
下面来看看如何连续接收通道中的值:
|
|
|
|
|
```rust
|
|
|
|
|
use std::sync::mpsc;
|
|
|
|
|
use std::thread;
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
|
|
|
|
|
|
thread::spawn(move || {
|
|
|
|
|
let vals = vec![
|
|
|
|
|
String::from("hi"),
|
|
|
|
|
String::from("from"),
|
|
|
|
|
String::from("the"),
|
|
|
|
|
String::from("thread"),
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
for val in vals {
|
|
|
|
|
tx.send(val).unwrap();
|
|
|
|
|
thread::sleep(Duration::from_secs(1));
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
for received in rx {
|
|
|
|
|
println!("Got: {}", received);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
在上面代码中,主线程和子线程是并发运行的,子线程在不停的**发送消息 -> 休眠1秒**,与此同时,主线程使用`for`循环**阻塞**的从`rx`**迭代器**中接收消息,当子线程运行完成时,发送者`tx`会随之被`drop`,此时`for`循环将被终止,最终`main`线程成功结束。
|
|
|
|
|
|
|
|
|
|
#### 使用多发送者
|
|
|
|
|
由于子线程会拿走发送者的所有权,因此我们必须对发送者进行克隆,然后让每个线程拿走它的一份拷贝:
|
|
|
|
|
```rust
|
|
|
|
|
use std::sync::mpsc;
|
|
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
let (tx, rx) = mpsc::channel();
|
|
|
|
|
let tx1 = tx.clone();
|
|
|
|
|
thread::spawn(move || {
|
|
|
|
|
tx.send(String::from("hi from raw tx")).unwrap();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
thread::spawn(move || {
|
|
|
|
|
tx1.send(String::from("hi from cloned tx")).unwrap();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
for received in rx {
|
|
|
|
|
println!("Got: {}", received);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
代码并无太大区别,就多了一个对发送者的克隆`let tx1 = tx.clone();`,然后一个子线程拿走`tx`的所有权,另一个子线程拿走`tx1`的所有权,皆大欢喜。
|
|
|
|
|
|
|
|
|
|
但是有几点需要注意:
|
|
|
|
|
|
|
|
|
|
- 需要所有的发送者都被`drop`掉后,接收者`rx`才会收到错误,进而跳出`for`循环,最终结束主线程
|
|
|
|
|
- 这里虽然用了`clone`但是并不会影响性能,因为它并不在热点代码路径中,仅仅会被执行一次
|
|
|
|
|
- 由于两个子线程谁创建完成是未知的,因此哪条消息先发送也是未知的,最终主线程的输出顺序也不确定
|
|
|
|
|
|
|
|
|
|
注意,上述第三点的消息顺序仅仅是因为线程创建引起的,并不代表通道中的线程是无序的,对于通道而言,先发送就先被接收,典型的`FIFO`(first in, first out)。
|
|
|
|
|
|
|
|
|
|
#### 消息顺序
|
|
|
|
|
### 关闭通道
|