diff --git a/src/advance/concurrency-with-threads/sync1.md b/src/advance/concurrency-with-threads/sync1.md index 90daff8e..2abf46c5 100644 --- a/src/advance/concurrency-with-threads/sync1.md +++ b/src/advance/concurrency-with-threads/sync1.md @@ -231,126 +231,111 @@ fn main() { 当我们拥有两个锁,且两个线程各自使用了其中一个锁,然后试图去访问另一个锁时,就可能发生死锁: ```rust -use std::{sync::{Mutex, MutexGuard}, thread}; -use std::thread::sleep; -use std::time::Duration; +use std::{sync::Mutex, thread}; use lazy_static::lazy_static; lazy_static! { - static ref MUTEX1: Mutex = Mutex::new(0); - static ref MUTEX2: Mutex = Mutex::new(0); + static ref COUNTER1: Mutex = Mutex::new(0); + static ref COUNTER2: Mutex = Mutex::new(0); } fn main() { - // 存放子线程的句柄 - let mut children = vec![]; - for i_thread in 0..2 { - children.push(thread::spawn(move || { - for _ in 0..1 { - // 线程1 - if i_thread % 2 == 0 { - // 锁住MUTEX1 - let guard: MutexGuard = MUTEX1.lock().unwrap(); - - println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread); - - // 当前线程睡眠一小会儿,等待线程2锁住MUTEX2 - sleep(Duration::from_millis(10)); - - // 去锁MUTEX2 - let guard = MUTEX2.lock().unwrap(); - // 线程2 - } else { - // 锁住MUTEX2 - let _guard = MUTEX2.lock().unwrap(); - - println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread); - - let _guard = MUTEX1.lock().unwrap(); - } - } - })); - } + let handle1 = thread::spawn(move || { + let mut num1 = COUNTER1.lock().unwrap(); + *num1 += 1; - // 等子线程完成 - for child in children { - let _ = child.join(); - } + println!("第一个线程获取了锁1"); + + println!("第一个线程开始等待了锁2"); + let mut num2 = COUNTER2.lock().unwrap(); + *num2 += 1; + + println!("第一个线程获取了锁2"); + }); + + let handle2 = thread::spawn(move || { + let mut num2 = COUNTER2.lock().unwrap(); + *num2 += 1; + + println!("第二个线程获取了锁2"); + + println!("第二个线程开始等待了锁1"); + let mut num1 = COUNTER1.lock().unwrap(); + *num1 += 1; + + println!("第二个线程获取了锁1"); + }); + + // 主线程等待这两个线程执行完在结束 + let _ = handle1.join(); + let _ = handle2.join(); println!("死锁没有发生"); } ``` +需要知道的是,这段代码并`不会100%发生死锁`,因为我们并`不知道线程什么时候执行`,有可能线程1完全执行完毕后线程2才开始执行,这种情况下不会发生死锁 -在上面的描述中,我们用了"可能"二字,原因在于死锁在这段代码中不是必然发生的,总有一次运行你能看到最后一行打印输出。这是由于子线程的初始化顺序和执行速度并不确定,我们无法确定哪个线程中的锁先被执行,因此也无法确定两个线程对锁的具体使用顺序。 - -但是,可以简单的说明下死锁发生的必然条件:线程 1 锁住了`MUTEX1`并且线程`2`锁住了`MUTEX2`,然后线程 1 试图去访问`MUTEX2`,同时线程`2`试图去访问`MUTEX1`,就会死锁。 因为线程 2 需要等待线程 1 释放`MUTEX1`后,才会释放`MUTEX2`,而与此同时,线程 1 需要等待线程 2 释放`MUTEX2`后才能释放`MUTEX1`,这种情况造成了两个线程都无法释放对方需要的锁,最终死锁。 - -那么为何某些时候,死锁不会发生?原因很简单,线程 2 在线程 1 锁`MUTEX1`之前,就已经全部执行完了,随之线程 2 的`MUTEX2`和`MUTEX1`被全部释放,线程 1 对锁的获取将不再有竞争者。 同理,线程 1 若全部被执行完,那线程 2 也不会被锁,因此我们在线程 1 中间加一个睡眠,增加死锁发生的概率。如果你在线程 2 中同样的位置也增加一个睡眠,那死锁将必然发生! +运行这段代码,如果你看到了类似如下的输出,则证明发生了死锁 +``` +第一个线程获取了锁1 +第二个线程获取了锁2 +第二个线程开始等待了锁1 +第一个线程开始等待了锁2 +``` #### try_lock 与`lock`方法不同,`try_lock`会**尝试**去获取一次锁,如果无法获取会返回一个错误,因此**不会发生阻塞**: ```rust -use std::{sync::{Mutex, MutexGuard}, thread}; -use std::thread::sleep; -use std::time::Duration; +use std::{sync::Mutex, thread}; use lazy_static::lazy_static; lazy_static! { - static ref MUTEX1: Mutex = Mutex::new(0); - static ref MUTEX2: Mutex = Mutex::new(0); + static ref COUNTER1: Mutex = Mutex::new(0); + static ref COUNTER2: Mutex = Mutex::new(0); } fn main() { - // 存放子线程的句柄 - let mut children = vec![]; - for i_thread in 0..2 { - children.push(thread::spawn(move || { - for _ in 0..1 { - // 线程1 - if i_thread % 2 == 0 { - // 锁住MUTEX1 - let guard: MutexGuard = MUTEX1.lock().unwrap(); - - println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread); - - // 当前线程睡眠一小会儿,等待线程2锁住MUTEX2 - sleep(Duration::from_millis(10)); - - // 去锁MUTEX2 - let guard = MUTEX2.try_lock(); - println!("线程1获取MUTEX2锁的结果: {:?}",guard); - // 线程2 - } else { - // 锁住MUTEX2 - let _guard = MUTEX2.lock().unwrap(); - - println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread); - sleep(Duration::from_millis(10)); - let guard = MUTEX1.try_lock(); - println!("线程2获取MUTEX1锁的结果: {:?}",guard); - } - } - })); - } + let handle1 = thread::spawn(move || { + let mut num1 = COUNTER1.lock().unwrap(); + *num1 += 1; - // 等子线程完成 - for child in children { - let _ = child.join(); - } + println!("第一个线程获取了锁1"); + + println!("第一个线程开始等待了锁2"); + let num2 = COUNTER2.try_lock(); // lock 换成 try_lock + println!("第一个线程获取锁2的结果是: {:?}", num2); + }); + + let handle2 = thread::spawn(move || { + let mut num2 = COUNTER2.lock().unwrap(); + *num2 += 1; + + println!("第二个线程获取了锁2"); + + println!("第二个线程开始等待了锁1"); + let num1 = COUNTER1.try_lock(); // lock 换成 try_lock + println!("第二个线程获取锁1的结果是: {:?}", num1); + }); + + // 主线程等待这两个线程执行完在结束 + let _ = handle1.join(); + let _ = handle2.join(); println!("死锁没有发生"); } ``` -为了演示`try_lock`的作用,我们特定使用了之前必定会死锁的代码,并且将`lock`替换成`try_lock`,与之前的结果不同,这段代码将不会再有死锁发生: +为了演示`try_lock`的作用,我们特定使用了之前必定会死锁的代码,并且将`lock`替换成`try_lock`,与之前的结果不同,这段代码无论如何将不会再有死锁发生: ```console -线程 0 锁住了MUTEX1,接着准备去锁MUTEX2 ! -线程 1 锁住了MUTEX2, 准备去锁MUTEX1 -线程2获取MUTEX1锁的结果: Err("WouldBlock") -线程1获取MUTEX2锁的结果: Ok(0) +第一个线程获取了锁1 +第一个线程开始等待了锁2 +第一个线程获取锁2的结果是: Err("WouldBlock") +第二个线程获取了锁2 +第二个线程开始等待了锁1 +第二个线程获取锁1的结果是: Ok(1) 死锁没有发生 ``` @@ -502,6 +487,109 @@ inner counter: 3 Mutex { data: true, poisoned: false, .. } ``` +例子2,`生产者消费者`模型 + +我们有一个生产者线程(铁匠),他生产(打造)一件装备后,通知消费者线程(玩家)消费(购买) +消费者线程(玩家)消费(购买)后,通知生产者线程(铁匠)继续生产(打造),直到生产者线程退出 + +```rust +use std::{ + sync::{Arc, Condvar, Mutex}, + thread, +}; + +fn main() { + // 条件变量,它可以让一个线程进入等待(锁),直至被其他线程唤醒 + let cond = Arc::new(Condvar::new()); + let cond_clone = cond.clone(); + + // 互斥锁和条件变量组合使用时,一般用于条件判断依据 + let mutex_lock = Arc::new(Mutex::new(false)); + let mutex_lock_clone = mutex_lock.clone(); + + let mut total_count = 10; + + thread::spawn(move || loop { + // 这里利用作用域获取了锁(MutexGuard)的值后直接让它释放 + let mut lock = { *mutex_lock_clone.lock().unwrap() }; + + // 如果 lock == false 表示铁匠(生产者)还没有打造出装备,因此进入等待,等待铁匠(生产者)打造装备 + while lock == false { + // 这里进入等待前一定要释放 lock,否则你拿着锁进入等待,其他线程就无法获得锁了 + // 这就是为什么上面要用作用域释放 lock 的原因 + lock = *cond_clone.wait(mutex_lock_clone.lock().unwrap()).unwrap(); // 进入等待 + } + + // 如果 lock == true 表示铁匠(生产者)已经打造出了一件装备,因此我们就可以购买(消费)了 + println!("玩家(消费者),购买了一件装备"); + + // 这里用用作用域包起来的原因,也是为了释放 lock + // 因为如果你拿着锁去唤醒铁匠(生产者)线程,它也肯定无法获得锁 + { + // 消费完后,把 lock 的值改成 false + *mutex_lock_clone.lock().unwrap() = false; + } + + // 唤醒生产者线程(其实这里是唤醒一个等待中的线程,只不过此时只有一个线程在等待,就是我们的生产者线程) + cond_clone.notify_one(); + }); + + while total_count > 0 { + // 这里利用作用域获取了锁(MutexGuard)的值后直接让它释放 + let mut lock = { *mutex_lock.lock().unwrap() }; + + // 如果 lock == true 表示已经打造了一件装备,因此暂停生产,等待玩家(消费者)购买(消费) + while lock == true { + // 这里进入等待前一定要释放 lock,否则你拿着锁进入等待,其他线程就无法获得锁了 + // 这就是为什么上面要用作用域释放 lock 的原因 + lock = *cond.wait(mutex_lock.lock().unwrap()).unwrap(); // 进入等待 + } + + // 否则如果 lock == false 表示还没有打造装备,因此需要打造一件装备 + total_count -= 1; + println!( + "铁匠(生产者),打造了一件装备,剩余材料总数: {}", + total_count + ); + + // 这里用用作用域包起来的原因,也是为了释放 lock + // 因为如果你拿着锁去唤醒玩家(消费者)线程,它也肯定无法获得锁 + { + // 打造完后,把 lock 的值改成 true + *mutex_lock.lock().unwrap() = true; + } + + // 唤醒消费者线程(其实这里是唤醒一个等待中的线程,只不过此时只有一个线程在等待,就是我们的消费者线程) + cond.notify_one(); + } +} +``` + +运行这段代码 + +```console +铁匠(生产者),打造了一件装备,剩余材料总数: 9 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 8 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 7 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 6 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 5 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 4 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 3 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 2 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 1 +玩家(消费者),购买了一件装备 +铁匠(生产者),打造了一件装备,剩余材料总数: 0 +玩家(消费者),购买了一件装备 +``` + ## 信号量 Semaphore 在多线程中,另一个重要的概念就是信号量,使用它可以让我们精准的控制当前正在运行的任务最大数量。想象一下,当一个新游戏刚开服时(有些较火的老游戏也会,比如`wow`),往往会控制游戏内玩家的同时在线数,一旦超过某个临界值,就开始进行排队进服。而在实际使用中,也有很多时候,我们需要通过信号量来控制最大并发数,防止服务器资源被撑爆。 diff --git a/src/advance/concurrency-with-threads/sync2.md b/src/advance/concurrency-with-threads/sync2.md index b1822971..1d75f2fa 100644 --- a/src/advance/concurrency-with-threads/sync2.md +++ b/src/advance/concurrency-with-threads/sync2.md @@ -12,33 +12,76 @@ ## 使用 Atomic 作为全局变量 -原子类型的一个常用场景,就是作为全局变量来使用: +我们先看看下面的代码 + +```rust +use std::{sync::Mutex, thread, time::Instant, ops::Sub}; + +use lazy_static::lazy_static; +lazy_static! { + static ref R: Mutex = Mutex::new(0); +} + +const N_TIMES: u64 = 10000000; +const N_THREADS: usize = 10; + +fn main() { + let s = Instant::now(); // 用于记录开始时间 + + let mut threads = Vec::with_capacity(N_THREADS); // 创建一个容量为 N_THREADS(10) 长度的 Vector + + // 循环创建线程,并添加到threads中 + for _ in 0..N_THREADS { + let handle = thread::spawn(move || { + let mut r = R.lock().unwrap(); + // 对共享变量R + 1 N_TIMES次(10000000) + for _ in 0..N_TIMES { + *r += 1; + } + }); + threads.push(handle); + } + + // 等待所有线程结束 + for thread in threads { + thread.join().unwrap(); + } + + let r = R.lock().unwrap(); + + // 断言R的结果是否是 N_TIMES * N_THREADS + assert_eq!(N_TIMES * N_THREADS as u64, *r); + + // 打印从开始到结束消耗的时间 + println!("{:?}", Instant::now().sub(s)); +} +``` +以上代码启动了数个线程,每个线程都在疯狂对全局变量进行加 1 操作, 最后将它与`线程数 * 加1次数`进行比较,如果发生了因为多个线程同时修改导致了脏数据,那么这两个必将不相等。好在,它没有让我们失望,不仅快速的完成了任务,而且保证了 100%的并发安全性。 + +现在让我们来看看用`Atomic`来实现这个程序 ```rust use std::ops::Sub; use std::sync::atomic::{AtomicU64, Ordering}; -use std::thread::{self, JoinHandle}; +use std::thread; use std::time::Instant; const N_TIMES: u64 = 10000000; const N_THREADS: usize = 10; -static R: AtomicU64 = AtomicU64::new(0); - -fn add_n_times(n: u64) -> JoinHandle<()> { - thread::spawn(move || { - for _ in 0..n { - R.fetch_add(1, Ordering::Relaxed); - } - }) -} +static R: AtomicU64 = AtomicU64::new(0); // u64类型的原子类型 fn main() { let s = Instant::now(); let mut threads = Vec::with_capacity(N_THREADS); for _ in 0..N_THREADS { - threads.push(add_n_times(N_TIMES)); + let handle = thread::spawn(move || { + for _ in 0..N_TIMES { + R.fetch_add(1, Ordering::Relaxed); // +1 操作 + } + }); + threads.push(handle); } for thread in threads { @@ -47,14 +90,11 @@ fn main() { assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed)); - println!("{:?}",Instant::now().sub(s)); + println!("{:?}", Instant::now().sub(s)); } ``` -以上代码启动了数个线程,每个线程都在疯狂对全局变量进行加 1 操作, 最后将它与`线程数 * 加1次数`进行比较,如果发生了因为多个线程同时修改导致了脏数据,那么这两个必将不相等。好在,它没有让我们失望,不仅快速的完成了任务,而且保证了 100%的并发安全性。 - -当然以上代码的功能其实也可以通过`Mutex`来实现,但是后者的强大功能是建立在额外的性能损耗基础上的,因此性能会逊色不少: - +运行这两段代码 ```console Atomic实现:673ms Mutex实现: 1136ms @@ -62,6 +102,8 @@ Mutex实现: 1136ms 可以看到`Atomic`实现会比`Mutex`快**41%**,实际上在复杂场景下还能更快(甚至达到 4 倍的性能差距)! +因此,可以看出,虽然`Mutex`也能轻松实现上诉功能,但它的强大功能是建立在额外的性能损耗基础上的,因此性能会逊色不少 + 还有一点值得注意: **和`Mutex`一样,`Atomic`的值具有内部可变性**,你无需将其声明为`mut`: ```rust @@ -229,6 +271,10 @@ fn main() { } ``` +线程`producer`和`consumer`如果不设置`内存屏障`,那么`DATA`的值可能由于`CPU 缓存导致内存顺序的改变` +举个例子,假如`reset`中将`DATA = 0`,此时,`producer`线程中将`DATA = 100`,但由于`CPU 缓存`的原因,`DATA = 100`还没有被同步到其它`CPU 缓存`中, +此时`consumer`线程中开始读取`DATA`,结果读到了值`0`,这也就造成了`断言失败`,因此这种情况下`内存屏障`是很有必要的 + 原则上,`Acquire`用于读取,而`Release`用于写入。但是由于有些原子操作同时拥有读取和写入的功能,此时就需要使用`AcqRel`来设置内存顺序了。在内存屏障中被写入的数据,都可以被其它线程读取到,不会有 CPU 缓存的问题。 **内存顺序的选择**