随着多核处理器的爆炸式增长,多线程同步访问共享内存的性能也成了计算机系统发展的关键。在《互斥锁与自旋锁》这篇文章中我们提到了互斥锁与自旋锁之间的区别以及各自的优点和适用场景。
普通自旋锁的实现
我们适用Java代码来实现一个简单的自旋锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import java.util.concurrent.atomic.AtomicBoolean;
public class Spinlock { private AtomicBoolean available = new AtomicBoolean(false);
public void lock() { while (!tryLock()) Thread.yield(); }
public boolean tryLock() { return available.compareAndSet(false, true); }
public void unlock() { if (!available.compareAndSet(true, false)) { throw new RuntimeException("释放锁失败"); } } }
|
这种简单的自旋锁有一个问题:无法保证多线程竞争的公平性。对于上面的Spinlock,当多个线程想要获取锁时,谁最先将available
设为false
谁就能最先获得锁,这可能会造成某些线程一直都未获取到锁造成“线程饥饿”。就像我们下课后蜂拥的跑向食堂,下班后蜂拥地挤向地铁,通常我们会采取排队的方式解决这样的问题,类似地,我们把这种锁叫“排队自旋锁(QueuedSpinlock)”。计算机科学家们使用了各种方式来实现排队自旋锁,如TicketLock,MCSLock,CLHLock。接下来我们使用源码对这几种锁的基本原理进行分析。
第一种方法就是按先来后到的顺序为每个线程编个号,按编号顺序来分配锁。这就类似于银行挂号或者医院挂号一样,按照先来后到的顺序为每个问诊者排个号,医生按号依次为问诊者服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import java.util.concurrent.atomic.AtomicInteger;
public class TicketSpinlock { private AtomicInteger currService = new AtomicInteger(); private ThreadLocal<Integer> LOCAL = new ThreadLocal<>();
private AtomicInteger ticketNumber = new AtomicInteger();
public void lock() { int myTicket = ticketNumber.getAndIncrement(); LOCAL.set(myTicket); while (myTicket != currService.get()) Thread.yield(); }
public void unlock() { int myTicket = LOCAL.get(); currService.compareAndSet(myTicket, myTicket + 1); } }
|
缺点
Ticket Lock 虽然解决了公平性的问题,但是多处理器系统上,每个进程/线程占用的处理器都在读写同一个变量serviceNum ,每次读写操作都必须在多个处理器缓存之间进行缓存同步,这会导致繁重的系统总线和内存的流量,大大降低系统整体的性能。
下面介绍的CLH锁和MCS锁都是为了解决这个问题的:
MCS 来自于其发明人名字的首字母: John Mellor Crummey和Michael Scott。
CLH的发明人是:Craig,Landin and Hagersten。
SMP和NUMA处理器架构
1、 SMP(Symmetric Multi-Processor)
对称多处理器结构,指服务器中多个CPU对称工作,每个CPU访问内存地址所需时间相同。其主要特征是共享,包含对CPU,内存,I/O等进行共享。
SMP架构系统
SMP能够保证内存一致性,但这些共享的资源很可能成为性能瓶颈,随着CPU数量的增加,每个CPU都要访问相同的内存资源,内存访问冲突随之增加,可能会导致CPU资源的浪费。常用的PC机就属于这种。
2、 NUMA(Non-Uniform Memory Access)
非一致存储访问,将CPU分为CPU模块,每个CPU模块由多个CPU组成,并且具有独立的本地内存、I/O槽口等,模块之间可以通过互联模块相互访问,访问本地内存的速度将远远高于访问远地内存(系统内其它节点的内存)的速度,这也是非一致存储访问的由来。
NUMA
NUMA较好地解决SMP的扩展问题,当CPU数量增加时,因为访问远地内存的延时远远超过本地内存,系统性能无法线性增加。
可以将NUMA视为紧密联系的集群计算形式。
CLHLock
检测前驱结点的locked
状态:如果为false
说明该线程是队列中的第一个线程;如果为true
说明为前面已经有线程获取了锁,当前线程需要自旋阻塞。
CLH自旋锁原理图
实现代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import java.util.concurrent.atomic.AtomicReference;
public class CLHSpinlock { private final AtomicReference<Node> tail; private final ThreadLocal<Node> myPred; private final ThreadLocal<Node> myNode;
public CLHSpinlock() { tail = new AtomicReference<Node>(new Node()); myNode = new ThreadLocal<Node>() { protected Node initialValue() { return new Node(); } };
myPred = new ThreadLocal<Node>() { protected Node initialValue() { return null; } }; }
public void lock() { final Node node = myNode.get(); node.locked = true; Node pred = this.tail.getAndSet(node); this.myPred.set(pred); while (pred.locked) Thread.yield(); }
public void unlock() { final Node node = myNode.get(); node.locked = false; myNode.set(this.myPred.get()); }
private static class Node { volatile boolean locked; } }
|
CLH队列锁的优点是空间复杂度低(如果有n个线程,L个锁,每个线程每次只获取一个锁,那么需要的存储空间是O(L+n),n个线程有n个
myNode,L个锁有L个tail),CLH的一种变体被应用在了JAVA并发框架中(AbstractQueuedSynchronizer.Node)。
CLH在SMP系统结构下法是非常有效的。但在NUMA系统结构下,每个线程有自己的内存,如果前趋结点的内存位置比较远,自旋判断前趋结点的locked域,性能大打折扣,一种解决NUMA系统结构的思路是MCS队列锁。
MCSLock
MSC与CLH最大的不同并不是链表是显示还是隐式,而是线程自旋的规则不同:CLH是在前趋结点的locked域上自旋等待,而MCS是在自己的
结点的locked域上自旋等待。正因为如此,它解决了CLH在NUMA系统架构中获取locked域状态内存过远的问题。
MCS自旋锁原理图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import java.util.concurrent.atomic.AtomicReference;
public class MCSSpinlock { private volatile AtomicReference<QNode> tail; private volatile ThreadLocal<QNode> myNode;
public MCSSpinlock() { tail = new AtomicReference<QNode>(null); myNode = new ThreadLocal<QNode>() { protected QNode initialValue() { return new QNode(); } }; }
public void lock() { QNode qnode = myNode.get(); QNode pred = tail.getAndSet(qnode); if (pred != null) { qnode.locked = true; pred.next = qnode; while (qnode.locked) Thread.yield(); } }
public void unlock() { QNode qnode = myNode.get(); if (qnode.next == null) { if (tail.compareAndSet(qnode, null)) return; while (qnode.next == null) Thread.yield(); } qnode.next.locked = false; qnode.next = null; }
private class QNode { volatile boolean locked = false; volatile QNode next = null; } }
|
公平性测试
测试仍然用之前用的计数器例子,不过为了体现出多线程竞争,代码中会使用到线程池以及一些并发库中的工具类,这些类我们会在后期的文章中陆续讲到。同时我们从上面各个锁中抽出一个Lock接口(方便测试用例),代码如下:
1 2 3 4 5
| public interface Lock { void lock();
void unlock(); }
|
java.util.concurrent.locks
包中的Lock接口也会在后期的文章中讲到。
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class ThreadCommunicate { static class Counter { private Lock lock = new Spinlock();
private volatile int value = 0;
public void increment() { lock.lock(); value++; System.out.println("++"); lock.unlock(); }
public void decrement() { lock.lock(); value--; System.out.println("--"); lock.unlock(); }
public int value() { return this.value; } }
static class IncrementTask implements Runnable { public void run() { long start = System.nanoTime(); for (int i = 0; i < 10000; i++) { counter.increment(); } long end = System.nanoTime(); System.out.println("Increment:" + (end - start)); latch.countDown(); } }
static class DecrementTask implements Runnable { public void run() { long start = System.nanoTime(); for (int i = 0; i < 10000; i++) { counter.decrement(); } long end = System.nanoTime(); System.out.println("Decrement:" + (end - start)); latch.countDown(); } }
private static Counter counter = new Counter(); private static CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException { ExecutorService threadPool1 = Executors.newFixedThreadPool(10); ExecutorService threadPool2 = Executors.newFixedThreadPool(10); threadPool1.execute(new IncrementTask()); threadPool2.execute(new DecrementTask()); threadPool1.shutdown(); threadPool2.shutdown(); latch.await(); System.out.println(counter.value()); } }
|
可以通过打开注释来修改使用的锁实现类。
1. 使用普通SpinLock
这里只截取一部分输出结果来说明问题:
1 2 3 4 5 6 7 8 9 10 11 12
| -- -- -- -- ++ ++ ++ ++ ++ ++ Increment:242524573 0
|
从输出结果可以看出++
和--
分布不均匀
2. TicketSpinlock执行结果
这里只截取一部分输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ++ -- ++ -- ++ -- ++ -- ++ -- ++ -- Decrement:231070401 Increment:231519110 0
|
输出结果中++
和--
基本保持均匀
执行结果的头部和尾部可能会有持续的++
或--
,这与线程池执行时间的先后顺序有关
CLHSpinlock执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ++ -- ++ -- ++ -- ++ -- ++ -- ++ -- Decrement:231070401 Increment:231519110 0
|
输出结果中++
和--
基本保持均匀
MCSSpinlock执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ++ -- ++ -- ++ -- ++ -- ++ -- ++ -- ++ -- ++ Increment:231035506 Decrement:230355670 0
|
输出结果中++
和--
基本保持均匀
从任务的耗时来看三种公平性实现在性能上相差也不是很大。
参考链接:
本作品采用 知识共享署名 4.0 国际许可协议 进行许可。
转载时请注明原文链接:https://blog.hufeifei.cn/2017/07/ComputerAndOS/%E8%87%AA%E6%97%8B%E9%94%81%E5%85%AC%E5%B9%B3%E6%80%A7%E7%9A%84%E4%B8%89%E7%A7%8D%E5%AE%9E%E7%8E%B0/
预览: