|
|
|
@ -261,6 +261,219 @@ for handle in handles {
|
|
|
|
|
|
|
|
|
|
总之,多线程的开销往往是在锁、数据竞争、缓存失效上,这些限制了现代化软件系统随着CPU核心的增多性能也线性增加的野心。
|
|
|
|
|
|
|
|
|
|
## 线程屏障(Barrier)
|
|
|
|
|
在Rust中,可以使用`Barrier`让多个线程都执行到某个点后,才继续一起往后执行:
|
|
|
|
|
```rust
|
|
|
|
|
use std::sync::{Arc, Barrier};
|
|
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
let mut handles = Vec::with_capacity(6);
|
|
|
|
|
let barrier = Arc::new(Barrier::new(6));
|
|
|
|
|
|
|
|
|
|
for _ in 0..6 {
|
|
|
|
|
let b = barrier.clone();
|
|
|
|
|
handles.push(thread::spawn(move|| {
|
|
|
|
|
println!("before wait");
|
|
|
|
|
b.wait();
|
|
|
|
|
println!("after wait");
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for handle in handles {
|
|
|
|
|
handle.join().unwrap();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
上面代码,我们在线程打印出`before wait`后增加了一个屏障,目的就是等所有的线程都打印出**before wait**后,各个线程再继续执行:
|
|
|
|
|
```console
|
|
|
|
|
before wait
|
|
|
|
|
before wait
|
|
|
|
|
before wait
|
|
|
|
|
before wait
|
|
|
|
|
before wait
|
|
|
|
|
before wait
|
|
|
|
|
after wait
|
|
|
|
|
after wait
|
|
|
|
|
after wait
|
|
|
|
|
after wait
|
|
|
|
|
after wait
|
|
|
|
|
after wait
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## 线程局部变量(Thread Loval Variable)
|
|
|
|
|
对于多线程编程,线程局部变量在一些场景下非常有用,而Rust通过标准库和三方库对此进行了支持。
|
|
|
|
|
|
|
|
|
|
#### 标准库thread_local
|
|
|
|
|
使用`thread_local`宏可以初始化线程局部变量,然后在线程内部使用该变量的`with`方法获取变量值:
|
|
|
|
|
```rust
|
|
|
|
|
use std::cell::RefCell;
|
|
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
|
|
thread_local!(static FOO: RefCell<u32> = RefCell::new(1));
|
|
|
|
|
|
|
|
|
|
FOO.with(|f| {
|
|
|
|
|
assert_eq!(*f.borrow(), 1);
|
|
|
|
|
*f.borrow_mut() = 2;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// 每个线程开始时都会拿到线程局部变量的FOO的初始值
|
|
|
|
|
let t = thread::spawn(move|| {
|
|
|
|
|
FOO.with(|f| {
|
|
|
|
|
assert_eq!(*f.borrow(), 1);
|
|
|
|
|
*f.borrow_mut() = 3;
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// 等待线程完成
|
|
|
|
|
t.join().unwrap();
|
|
|
|
|
|
|
|
|
|
// 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
|
|
|
|
|
FOO.with(|f| {
|
|
|
|
|
assert_eq!(*f.borrow(), 2);
|
|
|
|
|
});
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
上面代码中,`FOO`即是我们创建的**线程局部变量**,每个新的线程访问它时,都会使用它的初始值作为开始,各个线程中的`FOO`值彼此互不干扰。
|
|
|
|
|
|
|
|
|
|
可以注意到,线程中对`FOO`的使用是通过借用的方式,但是若我们需要每个线程独自获取它的拷贝,最后进行汇总,就有些强人所难了。
|
|
|
|
|
|
|
|
|
|
你还可以在结构体中使用线程局部变量:
|
|
|
|
|
```rust
|
|
|
|
|
use std::cell::RefCell;
|
|
|
|
|
|
|
|
|
|
struct Foo;
|
|
|
|
|
impl Foo {
|
|
|
|
|
thread_local! {
|
|
|
|
|
static FOO: RefCell<usize> = RefCell::new(0);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
Foo::FOO.with(|x| println!("{:?}", x));
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
或者通过引用的方式使用它:
|
|
|
|
|
```rust
|
|
|
|
|
use std::cell::RefCell;
|
|
|
|
|
use std::thread::LocalKey;
|
|
|
|
|
|
|
|
|
|
thread_local! {
|
|
|
|
|
static FOO: RefCell<usize> = RefCell::new(0);
|
|
|
|
|
}
|
|
|
|
|
struct Bar {
|
|
|
|
|
foo: &'static LocalKey<RefCell<usize>>,
|
|
|
|
|
}
|
|
|
|
|
impl Bar {
|
|
|
|
|
fn constructor() -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
foo: &FOO,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
#### 三方库thread-local
|
|
|
|
|
除了标准库外,一位大神还开发了[thread-local](https://github.com/Amanieu/thread_local-rs)库,它允许每个线程持有值的独立拷贝:
|
|
|
|
|
```rust
|
|
|
|
|
use thread_local::ThreadLocal;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::cell::Cell;
|
|
|
|
|
use std::thread;
|
|
|
|
|
|
|
|
|
|
let tls = Arc::new(ThreadLocal::new());
|
|
|
|
|
|
|
|
|
|
// 创建多个线程
|
|
|
|
|
for _ in 0..5 {
|
|
|
|
|
let tls2 = tls.clone();
|
|
|
|
|
thread::spawn(move || {
|
|
|
|
|
// 将计数器加1
|
|
|
|
|
let cell = tls2.get_or(|| Cell::new(0));
|
|
|
|
|
cell.set(cell.get() + 1);
|
|
|
|
|
}).join().unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 一旦所有子线程结束,收集它们的线程局部变量中的计数器值,然后进行求和
|
|
|
|
|
let tls = Arc::try_unwrap(tls).unwrap();
|
|
|
|
|
let total = tls.into_iter().fold(0, |x, y| x + y.get());
|
|
|
|
|
|
|
|
|
|
// 和为5
|
|
|
|
|
assert_eq!(total, 5);
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
该库不仅仅使用了值的拷贝,而且还能自动把多个拷贝汇总到一个迭代器中,最后进行求和,非常好用。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## 用条件控制线程的挂起和执行
|
|
|
|
|
条件变量(Condition Variables)经常和`Mutex`一起使用,可以让线程挂起,直到某个条件发生后再继续执行:
|
|
|
|
|
```rust
|
|
|
|
|
use std::thread;
|
|
|
|
|
use std::sync::{Arc, Mutex, Condvar};
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
|
|
|
|
let pair2 = pair.clone();
|
|
|
|
|
|
|
|
|
|
thread::spawn(move|| {
|
|
|
|
|
let &(ref lock, ref cvar) = &*pair2;
|
|
|
|
|
let mut started = lock.lock().unwrap();
|
|
|
|
|
println!("changing started");
|
|
|
|
|
*started = true;
|
|
|
|
|
cvar.notify_one();
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let &(ref lock, ref cvar) = &*pair;
|
|
|
|
|
let mut started = lock.lock().unwrap();
|
|
|
|
|
while !*started {
|
|
|
|
|
started = cvar.wait(started).unwrap();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
println!("started changed");
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
上述代码流程如下:
|
|
|
|
|
|
|
|
|
|
1. `main`线程首先进入`while`循环,并释放了锁`started`,然后开始挂起等待子线程的通知
|
|
|
|
|
2. 子线程获取到锁,并将其修改为true, 然后调用条件的方法来通知主线程继续执行:`cvar.notify_one`
|
|
|
|
|
|
|
|
|
|
## 只被调用一次的函数
|
|
|
|
|
有时,我们会需要某个函数在多线程环境下只被调用一次,例如初始化全局变量,无论是哪个线程先调用函数来初始化,都会保证全局变量只会被初始化一次,随后的其它线程调用就会忽略该函数:
|
|
|
|
|
```rust
|
|
|
|
|
use std::thread;
|
|
|
|
|
use std::sync::{Once, ONCE_INIT};
|
|
|
|
|
|
|
|
|
|
static mut VAL: usize = 0;
|
|
|
|
|
static INIT: Once = ONCE_INIT;
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
let handle1 = thread::spawn(move || {
|
|
|
|
|
INIT.call_once(|| {
|
|
|
|
|
unsafe {
|
|
|
|
|
VAL = 1;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let handle2 = thread::spawn(move || {
|
|
|
|
|
INIT.call_once(|| {
|
|
|
|
|
unsafe {
|
|
|
|
|
VAL = 2;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
handle1.join().unwrap();
|
|
|
|
|
handle2.join().unwrap();
|
|
|
|
|
|
|
|
|
|
println!("{}", unsafe { VAL });
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
代码运行的结果取决于哪个线程先调用`INIT.call_once`(虽然代码具有先后顺序,但是线程的初始化顺序并无法被保证!因为线程初始化是异步的,且耗时较久),若`handle1`先,则输出`1`,否则输出`2`。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## 总结
|
|
|
|
|
[Rust的线程模型](./intro.md)是`1:1`模型,因为Rust要保持尽量小的运行时。
|
|
|
|
|