同步队列

同步队列

∵ 多线程 ∴ 需要针对一些资源进行同步 ∴ 有了同步队列
实现方式主要有两种:阻塞方式(队列使用一个锁,出队入队用同一把锁或者队列使用两把锁,出队和入队用不同的锁)和非阻塞方式(底层通过循环CAS的方式)

synchronized是悲观锁,这种线程一旦得到锁,其他需要锁的线程就挂起的情况就是悲观锁
CAS操作的就是乐观锁,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。Compare And Swap 比较并替换

CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。

工作线程t1、t2从主内存根据内存地址V先拷贝一份变量的值到各自的工作内存空间,这就是预期值A;CAS(A, B),t1先更新变量的值,更新后,写到主内存中,此时t2再去传 CAS(A, B) 就会失败,因为A值已经变成了B值,所以t2需要重新拉取最新的值到工作内存空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
AtomicBoolean为例
compareAndSet(prev,newValue)方法要拆成compare(prev)方法和Set(newValue)方法,compare(prev)是等于prev后,就马上设置共享内存为newValue。
public final boolean getAndSet(boolean newValue) {
boolean prev;
do {
prev = get();
} while (!compareAndSet(prev, newValue));
return prev;
}

CAS的缺点:
1.CPU开销较大
在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很大的压力。所以一般会线程sleep
2.不能保证代码块的原子性
CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用Synchronized了。

CAS的原理:
CAS通过调用JNI的代码实现的。JNI:Java Native Interface为JAVA本地调用,允许java调用其他语言。而compareAndSwapInt就是借助C来调用CPU底层指令实现的

java.util.concurrent

CAS 可能产生的 ABA 问题

  1. 问题描述
    如说一个线程T1从内存位置V中取出A,这时候另一个线程T2也从内存中取出A,并且T2进行了一些操作变成了B,然后T2又将V位置的数据变成A,这时候线程T1进行CAS操作发现内存中仍然是A,然后T1操作成功。尽管线程T1的CAS操作成功,但是不代表这个过程就是没有问题的。如果链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化。
  2. 解决
  • AtomicMarkableReference 使用boolean变量——表示引用变量是否被更改过,不关心中间变量变化了几次
  • AtomicStampedReference 通过时间戳\版本进行唯一标记变量中途被更改了几次

1. 非阻塞队列

1.1 ConcurrentLinkedQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 在队尾插入一个元素,因为队列无边界,所以不会返回false
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);//入队前,创建一个新节点

for (Node<E> t = tail, p = t;;) {//除非插入成功并返回,否则反复循环
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {//利用CAS操作,将p的next指针从旧值null更新为newNode
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.利用CAS操作更新tail,如果失败说明其他线程添加了元素,由其他线程负责更新tail
return true;
}
// Lost CAS race to another thread; re-read next 如果添加元素失败,说明其他线程添加了元素,p后移,并继续尝试
}
else if (p == q) //如果p被移除出链表,我们需要调整指针重新指向head,否则我们指向新的tail
p = (t != (t = tail)) ? t : head;
else
//p指向tail或者q
p = (p != t && t != (t = tail)) ? t : q;
}
}
casTail(cmp,value)方法用于更新tail节点。tail被设置为volatile保证可见性

1.2 ConcurrentHashMap

HashTable 线程安全,但是put get实现方法全是synchronized,效率太低;HashMap 线程不安全。HashTable 效率低下,是因为所有线程访问竞争的都是同一把锁,而
JDK 1.5
ConcurrentHashMap 采用锁分段技术,将数据分段,每一段数据都配有一把锁,那么多线程访问不同数据段的数据时就不需要竞争同一把锁,就可以实现线程安全。对于像 size() containsValue() 方法需要锁定整个表而不仅仅是某几个数据段,那么就需要按顺序锁定所有段,操作完毕后,再按顺序释放所有段的锁。
pic1

JDK 1.8
ConcurrentHashMap取消了segment分段锁,而采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。
synchronized只锁定当前链表或红黑二叉树的head节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。

如果是key对应的是null,通过cas来设置当前值;如果key对应的不是null(即链表表头或树的根元素),就是用synchronized

pic2

TreeBin: 红黑二叉树节点 Node: 链表节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
hash表初始化或扩容时的一个控制位标识量。
负数代表正在进行初始化或扩容操作
-1 代表正在初始化
-N 表示有 N-1 个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小 */
private transient volatile int sizeCtl;
static final int MOVED = -1; // hash值是-1,表示这是一个forwardNode节点
static final int TREEBIN = -2; // hash值是-2 表示这时一个TreeBin节点


public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//ConcurrentHashMap 不允许插入null键,HashMap允许插入一个null键
if (key == null || value == null) throw new NullPointerException();
//计算key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
//for循环的作用:因为更新元素是使用CAS机制更新,需要不断的失败重试,直到成功为止。
for (Node<K,V>[] tab = table;;) {
// f:链表或红黑二叉树头结点,向链表中添加元素时,需要synchronized获取f的锁。
Node<K,V> f; int n, i, fh;
//判断Node[]数组是否初始化,没有则进行初始化操作
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//通过hash定位Node[]数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头结点),添加失败则进入下次循环。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//检查到内部正在移动元素(Node[] 数组扩容)
else if ((fh = f.hash) == MOVED)
//帮助它扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//锁住链表或红黑二叉树的头结点
synchronized (f) {
//判断f是否是链表的头结点
if (tabAt(tab, i) == f) {
//如果fh>=0 是链表节点
if (fh >= 0) {
binCount = 1;
//遍历链表所有节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果节点存在,则更新value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//不存在则在链表尾部添加新节点。
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//TreeBin是红黑二叉树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//添加树节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}

if (binCount != 0) {
//如果链表长度已经达到临界值8 就需要把链表转换为树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//将当前ConcurrentHashMap的size数量+1
addCount(1L, binCount);
return null;
}

判断Node[]数组是否初始化,没有则进行初始化操作
通过hash定位Node[]数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头结点),添加失败则进入下次循环。
检查到内部正在扩容,如果正在扩容,就帮助它一块扩容。
如果f!=null,则使用synchronized锁住f元素(链表/红黑二叉树的头元素)
4.1 如果是Node(链表结构)则执行链表的添加操作。
4.2 如果是TreeNode(树型结果)则执行树添加操作。
判断链表长度已经达到临界值8 就需要把链表转换为树结构。
总结:
    JDK8中的实现也是锁分离的思想,它把锁分的比segment(JDK1.5)更细一些,只要hash不冲突,就不会出现并发获得锁的情况。它首先使用无锁操作CAS插入头结点,如果插入失败,说明已经有别的线程插入头结点了,再次循环进行操作。如果头结点已经存在,则通过synchronized获得头结点锁,进行后续的操作。性能比segment分段锁又再次提升。

2. 阻塞队列

ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

参考

抛出异常 特殊值(通常是true、false) 阻塞 超时(在规定时间内进行操作,如果操作还不成功,就返回false)
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

2.1 SynchronousQueue

无界的,无缓冲的等待队列。无缓冲指的是,一个put必须需要一个take,不会保存put的数据。

生产者 put() 一个对象时,会等消费者来 take(),如果没有的话就一直等,直到消费者消费;反之,如果消费者 take() 时,会等生产者 put()。

isEmpty()方法永远返回是true,remainingCapacity() 方法永远返回是0,remove()和removeAll() 方法永远返回是false,iterator()方法永远返回空,peek()方法永远返回null。

它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
它是线程安全的,是阻塞的。
不允许使用 null 元素。
公平排序策略是指调用put的线程之间,或take的线程之间。

为什么 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); newCachedThreadPool() 方法里使用的是 SynchronousQueue。

SynchronousQueue队列,一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作。所以,当我们提交第一个任务的时候,是加入不了队列的,这就满足了一个线程池条件“当无法加入队列的时候,且任务没有达到maxsize时,我们将新开启一个线程任务”。所以我们的maxsize是int的最大值。时间是60s,当一个线程没有任务执行会暂时保存60s超时时间,如果没有的新的任务的话,会从cache中remove掉。线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

2.2 LinkedBlockingQueue

是一个无界缓存的等待队列。

基于链表的阻塞队列,内部维持着一个数据缓冲队列(该队列由链表构成)。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

在构造 LinkedBlockingQueue 对象时如果没有指定其容量大小,默认会设置为int最大值,一旦生产者的速度远远大于消费者的速度,会导致内存溢出。

2.3 ArrayBlockingQueue

是一个有界缓存的等待队列。在创建时必须指定队列长度,一旦指定,不能修改,不支持扩容。还可以指定是否采用公平锁。

内部维持着一个定长数据缓冲队列(该队列由数组构成)。ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。

2.4 DelayQueue

DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

2.5 PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。