From 5995b27e7adb70da6d3f08822c1d2f958ed33d3e Mon Sep 17 00:00:00 2001 From: sunface Date: Wed, 12 Jan 2022 15:33:51 +0800 Subject: [PATCH] =?UTF-8?q?update=E5=B9=B6=E5=8F=91=E5=8E=9F=E8=AF=AD?= =?UTF-8?q?=E4=B8=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- book/contents/SUMMARY.md | 3 +- .../concurrency-with-threads/primitives.md | 237 +++++++++++++++++- .../concurrency-with-threads/primitives1.md | 126 ++++++++++ .../concurrency-with-threads/thread.md | 4 +- book/contents/advance/global-variable.md | 61 ++++- 5 files changed, 415 insertions(+), 16 deletions(-) create mode 100644 book/contents/advance/concurrency-with-threads/primitives1.md diff --git a/book/contents/SUMMARY.md b/book/contents/SUMMARY.md index 5e21229f..07a0c6d5 100644 --- a/book/contents/SUMMARY.md +++ b/book/contents/SUMMARY.md @@ -77,7 +77,8 @@ - [并发和并行](advance/concurrency-with-threads/concurrency-parallelism.md) - [使用多线程](advance/concurrency-with-threads/thread.md) - [线程间的消息传递](advance/concurrency-with-threads/message-passing.md) - - [线程同步:并发原语和共享内存](advance/concurrency-with-threads/primitives.md) + - [线程同步:并发原语和共享内存(上)](advance/concurrency-with-threads/primitives.md) + - [线程同步:并发原语和共享内存(下)](advance/concurrency-with-threads/primitives1.md) - [Send、Sync todo](advance/multi-threads/send-sync.md) - [一个综合例子 todo](advance/multi-threads/example.md) - [async/await并发编程 todo](advance/async/intro.md) diff --git a/book/contents/advance/concurrency-with-threads/primitives.md b/book/contents/advance/concurrency-with-threads/primitives.md index e5701ed2..dc1315ce 100644 --- a/book/contents/advance/concurrency-with-threads/primitives.md +++ b/book/contents/advance/concurrency-with-threads/primitives.md @@ -1,4 +1,4 @@ -# 并发原语和共享内存 +# 并发原语和共享内存(上) 在多线程编程中,同步性极其的重要,当你需要同时访问一个资源、控制不同线程的执行次序时,都需要使用到同步性。 在Rust中有多种方式可以实现同步性。在上一节中讲到的消息传递就是同步性的一种实现方式,我们可以通过消息传递来控制不同线程间的执行次序。还可以使用共享内存来实现同步性,例如通过锁和原子操作等并发原语来实现多个线程同时且安全地去访问一个资源。 @@ -164,19 +164,234 @@ Result: 10 简单总结下:`Rc/RefCell`用于单线程可变性, `Arc/Mutext`用于多线程可变性。 -#### Mutex的局限性 -Mutexes have a reputation for being difficult to use because you have to remember two rules: +#### 需要小心的Mutex +如果有其它语言的编程经验,就知道互斥锁这家伙不好对付,如果要正确使用,你得牢记在心: -You must attempt to acquire the lock before using the data. -When you’re done with the data that the mutex guards, you must unlock the data so other threads can acquire the lock. +- 在使用数据前必须先获取锁 +- 在数据使用完成后,必须**及时**的释放锁,比如文章开头的例子,使用内部语句块的目的就是为了及时的释放锁 -Management of mutexes can be incredibly tricky to get right, which is why so many people are enthusiastic about channels. However, thanks to Rust’s type system and ownership rules, you can’t get locking and unlocking wrong. +这两点看起来不起眼,但是如果要正确的使用,其实是相当不简单的,对于其它语言,忘记释放锁是经常发生的,虽然Rust通过智能指针的`drop`机制帮助我们避免了这一点,但是由于不及时释放锁导致的性能问题也是常见的。 -Another detail to note is that Rust can’t protect you from all kinds of logic errors when you use Mutex. Recall in Chapter 15 that using Rc came with the risk of creating reference cycles, where two Rc values refer to each other, causing memory leaks. Similarly, Mutex comes with the risk of creating deadlocks. These occur when an operation needs to lock two resources and two threads have each acquired one of the locks, causing them to wait for each other forever. If you’re interested in deadlocks, try creating a Rust program that has a deadlock; then research deadlock mitigation strategies for mutexes in any language and have a go at implementing them in Rust. The standard library API documentation for Mutex and MutexGuard offers useful information. -#### Mutex和Arc +正因为这种困难性,导致很多用户都热衷于使用消息传递的方式来实现同步,例如Go语言直接把`channel`内置在语言特性中,甚至还有无锁的语言,例如`erlang`,完全使用`Actor`模型,依赖消息传递来完成共享和同步。好在Rust的类型系统、所有权机制、智能指针等可以很好的帮助我们减轻使用锁时的负担。 -## RwLock +另一个值的注意的是在使用`Mutex`时,Rust无法保证我们避免所有的逻辑错误,例如在之前章节,我们提到过使用`Rc`可能会导致[循环引用的问题](../circle-self-ref/circle-reference.md)。类似的,`Mutex`也存在使用上的风险,例如创建死锁(deadlock):当一个操作试图锁住两个资源,然后两个线程各自获取其中一个锁,并试图获取另一个锁时,就会造成死锁。 -## Atomic +## 死锁 +在Rust中有多种方式可以创建死锁,了解这些方式有助于你提前规避可能的风险,一起来看看。 -## Condvar \ No newline at end of file +#### 单线程死锁 +这种死锁比较容易规避,但是当代码复杂后还是有可能遇到: +```rust +use std::sync::Mutex; + +fn main() { + let data = Mutex::new(0); + let d1 = data.lock(); + let d2 = data.lock(); // cannot lock, since d1 is still active +} +``` + +非常简单,只要你在另一个锁还未被释放时去申请新的锁,就会触发,当代码复杂后,这种情况可能就没有那么显眼。 + +#### 多线程死锁 +当有两个锁,然后两个线程各自使用了其中一个锁,并且试图去访问另一个锁时,就可能发生死锁: +```rust +use std::{sync::{Mutex, MutexGuard}, thread}; +use std::thread::sleep; +use std::time::Duration; + +use lazy_static::lazy_static; +lazy_static! { + static ref MUTEX1: Mutex = Mutex::new(0); + static ref MUTEX2: Mutex = Mutex::new(0); +} + +fn main() { + // 存放子线程的句柄 + let mut children = vec![]; + for i_thread in 0..2 { + children.push(thread::spawn(move || { + for _ in 0..1 { + // 线程1 + if i_thread % 2 == 0 { + // 锁住mutex1 + let guard: MutexGuard = MUTEX1.lock().unwrap(); + + println!("线程 {} 锁住了mutex1,接着准备去锁mutex2 !", i_thread); + + // 当前线程睡眠一小会儿,等待线程2锁住mutex2 + sleep(Duration::from_millis(10)); + + // 去锁mutex2 + let guard = MUTEX2.lock().unwrap(); + // 线程2 + } else { + // 锁住mutex2 + let _guard = MUTEX2.lock().unwrap(); + + println!("线程 {} 锁住了mutex2, 准备去锁mutex1", i_thread); + + let _guard = MUTEX1.lock().unwrap(); + } + } + })); + } + + // 等子线程完成 + for child in children { + let _ = child.join(); + } + + println!("死锁没有发生"); +} +``` + +在上面的描述中,我们用了可能发生死锁,是因为死锁在这段代码中不是必然发生的,总有一次运行你能看到最后一行打印输出。这是由于子线程的初始化顺序和执行速度并不确定,我们无法确定哪个线程的锁先被执行,因此也无法确定两个线程对锁的具体使用顺序。 + +但是,可以简单的说明下死锁发生的必然条件:线程1锁住了`mutex1`并且线程`2`锁住了`mutex2`,然后线程1试图去访问`mutex2`,同时线程`2`试图去访问`mutex1`,就会锁住。 因为线程2需要等待线程1释放`mutex1`后,才会释放`mutex2`,而与此同时,线程1需要等待线程2释放`mutex2`后才能释放`mutex1`,这种情况造成了两个线程都无法释放对方需要的锁,最终锁死。 + +为何某些时候,死锁不会发生?。原因很简单,线程2在线程1锁`mutex1`之前,就已经全部执行完了,随之线程2的`mutex2`和`mutex1`被全部释放,线程1对锁的获取将不再有竞争者。 同理,线程1若全部被执行完,那线程2也不会被锁,因此我们在线程1中间加一个睡眠,增加死锁发生的概率。如果你在线程2中同样的位置也增加一个睡眠,那死锁将必然发生! + +#### try_lock +与`lock`方法不同,`try_lock`会尝试去获取一次锁,**如果无法获取会返回一个错误,因此不会发生阻塞**: +```rust +use std::{sync::{Mutex, MutexGuard}, thread}; +use std::thread::sleep; +use std::time::Duration; + +use lazy_static::lazy_static; +lazy_static! { + static ref MUTEX1: Mutex = Mutex::new(0); + static ref MUTEX2: Mutex = Mutex::new(0); +} + +fn main() { + // 存放子线程的句柄 + let mut children = vec![]; + for i_thread in 0..2 { + children.push(thread::spawn(move || { + for _ in 0..1 { + // 线程1 + if i_thread % 2 == 0 { + // 锁住mutex1 + let guard: MutexGuard = MUTEX1.lock().unwrap(); + + println!("线程 {} 锁住了mutex1,接着准备去锁mutex2 !", i_thread); + + // 当前线程睡眠一小会儿,等待线程2锁住mutex2 + sleep(Duration::from_millis(10)); + + // 去锁mutex2 + let guard = MUTEX2.try_lock(); + println!("线程1获取mutex2锁的结果: {:?}",guard); + // 线程2 + } else { + // 锁住mutex2 + let _guard = MUTEX2.lock().unwrap(); + + println!("线程 {} 锁住了mutex2, 准备去锁mutex1", i_thread); + sleep(Duration::from_millis(10)); + let guard = MUTEX1.try_lock(); + println!("线程2获取mutex1锁的结果: {:?}",guard); + } + } + })); + } + + // 等子线程完成 + for child in children { + let _ = child.join(); + } + + println!("死锁没有发生"); +} +``` + +为了演示`try_lock`的作用,我们特定使用了之前必定会死锁的代码,然后将`lock`替换程`try_lock`,而此时,这段代码将不会再有死锁发生: +```console +线程 0 锁住了mutex1,接着准备去锁mutex2 ! +线程 1 锁住了mutex2, 准备去锁mutex1 +线程2获取mutex1锁的结果: Err("WouldBlock") +线程1获取mutex2锁的结果: Ok(0) +死锁没有发生 +``` + +如上所示,当`try_lock`失败时,会报出一个错误:`Err("WouldBlock")`,然后线程其余代码会继续执行,不再被阻塞。 + +> 一个有趣的命名规则:在Rust标准库中,使用`try_xxx`都会尝试进行一次操作,如果无法完成,就立即返回,不会发生阻塞。例如消息传递章节中的`try_recv`以及本章节中的`try_lock` + + +## 读写锁RwLock +`Mutex`有一个问题,无论是读还是写都会同时只有一个线程能访问,因此读写都会被锁住。在某些时候,我们需要大量的并发读,此时就可以使用`RwLock`: +```rust +use std::sync::RwLock; + +fn main() { + let lock = RwLock::new(5); + + // 同一时间允许多个读 + { + let r1 = lock.read().unwrap(); + let r2 = lock.read().unwrap(); + assert_eq!(*r1, 5); + assert_eq!(*r2, 5); + } // 读锁在此处被drop + + // 同一时间只允许一个写 + { + let mut w = lock.write().unwrap(); + *w += 1; + assert_eq!(*w, 6); + + // 以下代码会panic,因为读和写不允许同时存在 + // 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中 + // let r1 = lock.read(); + // println!("{:?}",r1); + }// 写锁在此处被drop +} +``` + +`RwLock`在使用上和`Mutex`区别不大,就是还额外提供了一个`read`方法,需要注意的是,当读写同时发生时,程序会直接`panic`(本例是单线程,实际上多个线程中也是如此),因为会发生死锁: +```console +thread 'main' panicked at 'rwlock read lock would result in deadlock', /rustc/efec545293b9263be9edfb283a7aa66350b3acbf/library/std/src/sys/unix/rwlock.rs:49:13 +note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace +``` + +好在我们可以使用`try_write`和`try_read`来尝试进行一次写/读,若失败则返回错误: +```console +Err("WouldBlock") +``` + +简单总结下`RwLock`: + +1. 同时允许多个读,但最多只能有一个写 +2. 读和写不能同时存在 +3. 读可以使用`read`、`try_read`,写`write`、`try_write`, 在实际项目中,`try_xxx`会安全的多 + +## Mutex还是RwLock +首先简单性上`Mutex`完胜,因为使用`RwLock`你得操心几个问题: + +- 读和写不能同时发生,如果使用`try_xxx`解决,就必须做大量的错误处理和失败重试机制 +- 当读多写少时,写操作可能会因为一直无法获得锁导致连续多次失败([writer starvation](https://stackoverflow.com/questions/2190090/how-to-prevent-writer-starvation-in-a-read-write-lock-in-pthreads)) +- RwLock其实是操作系统提供的实现,原理要比`Mutex`复杂的多,锁的自身性能而言,是比不上原生实现的`Mutex` + +因此我们可以简单总结下两者的使用场景: + +- 追求高并发读取时,使用`RwLock`,因为`Mutex`一次只允许一个线程去读取 +- 如果要保证写操作的成功性,使用`Mutex` +- 不知道哪个适合,统一使用`Mutex` + +需要注意的是,`RwLock`虽然看上去好像提供了高并发读取的能力,但这个不能说明它的性能比`Mutex`高,事实上`Mutex`性能要好不少,后者**唯一的问题也仅仅在于不能并发读取**。 + +一个常见的错误使用`RwLock`的场景就是使用`HashMap`进行简单读写时,因为`HashMap`的读和写都非常快,`RwLock`的复杂实现和相对低的性能反而会导致整体性能的降低,因此一般来说更适合使用`Mutex`。 + +总之,如果你要使用`RwLock`要确保满足以下两个条件:并发读,且需要对读到的资源进行"长时间"的操作,`HashMap`也许满足了并发读的需求,但是往往并不能满足后者:"长时间"的操作。 + +> benchmark永远是你在迷茫时最好的朋友! + +## 三方库提供的锁实现 +标准库在设计时总会存在取舍,因为往往性能并不是最好的,如果你追求性能,可以使用三方库提供的并发原语: +- [parking_lot](https://crates.io/crates/parking_lot), 功能更完善、稳定,社区较为活跃,star较多,更新较为活跃 +- [spin](https://crates.io/crates/spin), 在多数场景中性能比`parking_lot`高一点,最近没怎么更新 + +如果不是追求特别极致的性能,建议选择前者。 \ No newline at end of file diff --git a/book/contents/advance/concurrency-with-threads/primitives1.md b/book/contents/advance/concurrency-with-threads/primitives1.md new file mode 100644 index 00000000..af17c83e --- /dev/null +++ b/book/contents/advance/concurrency-with-threads/primitives1.md @@ -0,0 +1,126 @@ +# 线程同步:并发原语和共享内存(下) + + + +## 原子类型atomic +`Mutex`用起来简单,但是无法并发读,`RwLock`可以并发读,但是复杂且性能不够,那么有没有一种全能性选手呢? 欢迎我们的`Atomic`闪亮登场。 + +从Rust1.34版本后,就正式支持原子类型。原子指的是一系列不可被CPU上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核CPU下,都某个CPU核心开始运行原子操作时,会先暂停其它CPU内核对内存的操作,以保证原子操作不会被其它CPU内核所干扰。 + +由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,几乎所有的语言都支持原子类型。 + +可以看出原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了`CAS`循环,当大量的冲突发生时,该等待还是得[等待](./thread.md#多线程的开销)!但是总归比锁要好。 + +#### 使用原子类型作为全局变量 +原子类型的一个常用场景,就是作为全局变量来使用: +```rust +use std::thread::{self, JoinHandle}; +use std::sync::atomic::{Ordering, AtomicU64}; + +const N_TIMES: u64 = 100000; +const N_THREADS: usize = 10; + +static R: AtomicU64 = AtomicU64::new(0); + +fn reset() { + R.store(0, Ordering::Relaxed); +} + +fn add_n_times(n: u64) -> JoinHandle<()> { + thread::spawn(move || { + for _ in 0..n { + R.fetch_add(1, Ordering::Relaxed); + } + }) +} + +fn main() { + loop { + reset(); + + let mut threads = Vec::with_capacity(N_THREADS); + + for _ in 0..N_THREADS { + threads.push(add_n_times(N_TIMES)); + } + + for thread in threads { + thread.join().unwrap(); + } + + assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed)); + } +} +``` + +以上代码启动了数个线程,每个线程都在疯狂对全局变量进行加1操作, 最后将它与线程数 * 加1操作数进行比较,如果发生了因为多个线程同时修改导致了脏数据,那么这两个必将不相等。好在,它没有让我们失望,不仅快速的完成了任务,而且保证了100%的并发安全性。 + +在多线程环境中要使用`Atomic`需要配合`Arc`: +```rust +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{hint, thread}; + +fn main() { + let spinlock = Arc::new(AtomicUsize::new(1)); + + let spinlock_clone = Arc::clone(&spinlock); + let thread = thread::spawn(move|| { + spinlock_clone.store(0, Ordering::SeqCst); + }); + + // 等待其它线程释放锁 + while spinlock.load(Ordering::SeqCst) != 0 { + hint::spin_loop(); + } + + if let Err(panic) = thread.join() { + println!("Thread had an error: {:?}", panic); + } +} +``` + + +#### 能替代锁吗 +那么原子类型既然这么全能,它可以替代锁吗?答案是不行: + +- `std::sync::atomic`包中仅提供了数值类型的原子操作:`AtomicBool`, `AtomicIsize`, `AtomicUsize`, `AtomicI8`, `AtomicU16`等,而锁可以应用于各种类型 +- 在有些情况下,必须使用锁来配合,例如下面的`Condvar` + +## 用条件(Condvar)控制线程的同步 +条件变量(Condition Variables)经常和`Mutex`一起使用,可以让线程挂起,直到某个条件发生后再继续执行,其实`Condvar`我们在之前的多线程章节就已经见到过,现在再来看一个不同的例子: +```rust +use std::sync::{Arc, Mutex, Condvar}; +use std::thread; + +fn main() { + + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = pair.clone(); + + // 创建一个新线程 + thread::spawn(move|| { + let &(ref lock, ref cvar) = &*pair2; + let mut started = lock.lock().unwrap(); + *started = true; + cvar.notify_one(); + println!("notify main thread"); + }); + + // 等待新线程先运行 + let &(ref lock, ref cvar) = &*pair; + let mut started = lock.lock().unwrap(); + while !*started { + println!("before wait"); + started = cvar.wait(started).unwrap(); + println!("after wait"); + } +} +``` + +上述代码流程如下: + +1. `main`线程首先进入`while`循环,并释放了锁`started`,然后开始挂起等待子线程的通知 +2. 子线程获取到锁,并将其修改为true, 然后调用条件的方法来通知主线程继续执行:`cvar.notify_one` + + diff --git a/book/contents/advance/concurrency-with-threads/thread.md b/book/contents/advance/concurrency-with-threads/thread.md index 8ff0f1b5..6763beef 100644 --- a/book/contents/advance/concurrency-with-threads/thread.md +++ b/book/contents/advance/concurrency-with-threads/thread.md @@ -221,15 +221,13 @@ fn main() { 但是当你的任务大部分时间都处于阻塞状态时,就可以考虑增多线程数量,这样当某个线程处于阻塞状态时,会被切走,进而运行其它的线程,典型就是网络IO操作,我们可以为每一个进来的用户连接创建一个线程去处理,该连接绝大部分时间都是处于IO读取阻塞状态,因此有限的CPU核心完全可以处理成百上千的用户连接线程,但是事实上,对于这种网络IO情况,一般都不再使用多线程的方式了,毕竟操作系统的线程数是有限的,意味着并发数也很容易达到上限,使用async/await的`M:N`并发模型,就没有这个烦恼。 #### 多线程的开销 -下面的代码是一个无锁实现的hashmap在多线程下的使用: +下面的代码是一个无锁实现(CAS)的hashmap在多线程下的使用: ```rust for i in 0..num_threads { - //clone the shared data structure let ht = Arc::clone(&ht); let handle = thread::spawn(move || { for j in 0..adds_per_thread { - //randomly generate and add a (key, value) let key = thread_rng().gen::(); let value = thread_rng().gen::(); ht.set_item(key, value); diff --git a/book/contents/advance/global-variable.md b/book/contents/advance/global-variable.md index 84cd5b36..9c9f9a6b 100644 --- a/book/contents/advance/global-variable.md +++ b/book/contents/advance/global-variable.md @@ -39,4 +39,63 @@ impl Factory{ ``` ## 从函数中返回全局变量 -https://www.reddit.com/r/learnrust/comments/rqn74g/cant_a_function_return_a_reference_to_some_global/ \ No newline at end of file +https://www.reddit.com/r/learnrust/comments/rqn74g/cant_a_function_return_a_reference_to_some_global/ + + +## lazy_static +```rust +use std::{sync::{Mutex, MutexGuard}, thread}; +use std::thread::sleep; +use std::time::Duration; + +use lazy_static::lazy_static; +lazy_static! { + static ref MUTEX1: Mutex = Mutex::new(0); + static ref MUTEX2: Mutex = Mutex::new(0); +} + +fn main() { + // Spawn thread and store handles + let mut children = vec![]; + for i_thread in 0..2 { + children.push(thread::spawn(move || { + for _ in 0..1 { + // Thread 1 + if i_thread % 2 == 0 { + // Lock mutex1 + // No need to specify type but yes create a dummy variable to prevent rust + // compiler from being lazy + let _guard: MutexGuard = MUTEX1.lock().unwrap(); + + // Just log + println!("Thread {} locked mutex1 and will try to lock the mutex2, after a nap !", i_thread); + + // Here I sleep to let Thread 2 lock mutex2 + sleep(Duration::from_millis(10)); + + // Lock mutex 2 + let _guard = MUTEX2.lock().unwrap(); + // Thread 2 + } else { + // Lock mutex 1 + let _guard = MUTEX2.lock().unwrap(); + + println!("Thread {} locked mutex2 and will try to lock the mutex1", i_thread); + + // Here I freeze ! + let _guard = MUTEX1.lock().unwrap(); + } + } + })); + } + + // Wait + for child in children { + let _ = child.join(); + } + + println!("This is not printed"); +} +``` + +## box::leak \ No newline at end of file