多线程与高并发

进程和线程的区别?

答案:进程就是一个程序运行起来的状态,线程是一个进程中的不同的执行路径。专业:进程是OS分配资源的基本单位,线程是执行调度的基本单位。分配资源最重要的是:独立的内存空间,线程调度执行(线程共享进程的内存空间,没有自己独立的内存空间)

纤程:用户态的线程,线程中的线程,切换和调度不需要经过OS

锁升级过程:无锁、偏向锁、自旋锁(CAS)、重量级锁

可重入锁:对象头中记录线程ID

volatile:可见性(MESI)、指令重排序(内存屏障)

Atomic原子类:CAS

1
2
3
4
5
6
7
8
9
final Object object = new Object();
Synchronized(object) {
//当前线程让出锁:object,进入等待队列,等待其他线程唤醒它
object.wait();
//当前线程唤醒处于等待队列的线程(无法选择哪一个线程)
object.notify();
//当前线程让出CPU资源,然后重新加入到抢占CPU的线程队列,可能依旧能够抢占到CPU
Thread.yield();
}

基本waitnotify是成对出现的。一个线程wait让出锁之后,需要另一个线程调用notify唤醒处于等待队列中的线程。

线程状态

按照Java线程的生命周期来分:

1
2
3
4
5
6
7
8
9
10
11
新建(NEW):就是刚使用new方法,new出来的线程;

就绪(Runnable):就是调用的线程的start()方法后,这时候线程处于等待CPU分配资源阶段,谁先抢
的CPU资源,谁开始执行;

运行(Runnning):当就绪的线程被调度并获得CPU资源时,便进入运行状态,run方法定义了线程的
操作和功能;

阻塞(Blocked): 阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。

销毁(TERMINATED):如果线程正常执行完毕后或线程被提前强制性的终止或出现异常导致结束,那么线程就要被销毁,释放资源
1
2
3
4
5
6
7
阻塞的情况分三种:

等待阻塞 -- 通过调用线程的Object.wait()/Thread.join(),让线程等待某工作的完成。

同步阻塞 -- 线程在获取synchronized同步锁失败(因为锁被其它线程所占用),它会进入同步阻塞状态。

其他阻塞 -- 通过调用线程的sleep()或join()或发出了I/O请求时,线程会进入到阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。

juc

按照JDK的源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
NEW: 尚未启动的线程的线程状态

RUNNABLE: 处于可运行状态的线程正在Java虚拟机中执行,但它可能正在等待来自操作系统(例如处理器)的其他资源

BLOCKED: 线程的线程状态被阻塞,等待监视器锁定。处于阻塞状态的线程正在等待监视器锁定以输入同步的块方法或在调用后重新输入同步的块方法,通过 Object#wait()进入阻塞

WAITING:处于等待状态的线程正在等待另一个线程执行特定操作:
例如: 在对象上调用了Object.wait()的线程正在等待另一个线程调用Object.notify() 或者
Object.notifyAll(), 调用了 Thread.join()的线程正在等待指定的线程终止

TIMED_WAITING : 具有指定等待时间的等待线程的线程状态。由于以指定的正等待时间调用以下方法之一,因此线程处于定时等待状态:
1. Thread.sleep(long)
2. Object#wait(long)
3. Thread.join(long)
4. LockSupport.parkNanos(long...)
5. LockSupport.parkUntil(long...)

TERMINATED: 终止线程的线程状态。线程已完成执行

juc

线程数量

程序开多少线程合适?

CPU密集型:CPU核数 + 1

IO密集型:2 * CPU 核数 + 1 【计算公式:CPU核数 / (1 - IO阻塞系数) 阻塞系数通常是0.8~0.9】

AQS工具类

ReentrantLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Lock lock = new ReentrantLock();

public void executeMethod() {
try {
lock.lock(); //synchronized(this)
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}

ReentrantLockSynchronized的区别:

1.Synchronized是自动加锁和解锁,ReentrantLock是手动的。

2.Synchronized使用锁升级的概念加锁,而ReentrantLock使用的CAS+volatile加锁。

3.Synchronized没办法唤醒等待队列中指定的线程,而ReentrantLock提供Condition可以做到。

4.Synchronized一执行无法取消,而ReentrantLock提供了tryLock方法,指定时间尝试获取锁。

CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);

for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
//当闭锁数量等于0时开放
latch.countDown();
});
}

for (int i = 0; i < threads.length; i++) {
threads[i].start();
}

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("end latch");

CountDownLatch又称闭锁、门栓,可以延迟线程的进度直到其到达终止状态。闭锁相当于一扇门,在闭锁到达结束状态之前,这扇门一直是关闭的,没有任何线程能够通过,当到达结束状态时,这扇门会打开并允许所有线程的通过。

当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成了后才继续执行。

CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人,发车"));

for(int i=0; i<100; i++) {
new Thread(()->{
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}

CyclicBarrier栅栏类似闭锁,与闭锁的区别在于,所有线程都必须到达栅栏位置,才能执行,闭锁用于等待时间,而栅栏用于等待其他线程。

闭锁不能被重复使用,而栅栏可以被重复使用,它类似于坐动车,满人了,发车,人没满,就一直等到人满,一直循环载客

ReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class T10_TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;

static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();

public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over!");
//模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over!");
//模拟写操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
//Runnable readR = ()-> read(lock);
Runnable readR = ()-> read(readLock);

//Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = ()->write(writeLock, new Random().nextInt());

for(int i=0; i<18; i++) new Thread(readR).start();
for(int i=0; i<2; i++) new Thread(writeR).start();
}
}

读写锁,当读锁时,支持其他读锁的访问,所以它是个共享锁,但是不支持写锁的访问,如果一个线程中先获取了读锁,并尝试再获取写锁,那么将会产生死锁。

细节分析

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class T11_TestSemaphore {
public static void main(String[] args) {
//Semaphore s = new Semaphore(2);
Semaphore s = new Semaphore(2, true);
//允许一个线程同时执行
//Semaphore s = new Semaphore(1);

new Thread(()->{
try {
s.acquire();

System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();

new Thread(()->{
try {
s.acquire();

System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");
//释放一个许可给信号量
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

计数信号量Counting Semaphore用于控制同时访问某个特性资源的操作数量,或者同时执行某个执行的数据量。

Semaphore管理着一组虚拟的许可permit,许可的初始数量可通过构造函数指定,在执行操作时可以首先获得许可,并在使用以后释放许可,如果没有许可,那么acquire将阻塞直到有许可。

LockSupport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class T13_TestLockSupport {
public static void main(String[] args) {
Thread t = new Thread(()->{
for (int i = 0; i < 10; i++) {
System.out.println(i);
if(i == 5) {
LockSupport.park();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

t.start();
LockSupport.unpark(t);
/*try {
TimeUnit.SECONDS.sleep(8);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after 8 senconds!");

LockSupport.unpark(t);*/
}
}

LockSupportparkunpark类似于Syncwaitnotify。它的unpark类似于停车位,被调用时后等于没停车位了,所以线程当线程调用park时,将不会在阻塞,也就是说,它的调用是能累积影响park的执行的。

1
2
面试题:
实现一个容器,提供两个方法,add,size,写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到达5个时,线程2给出提示并结束
1
2
3
4
//1.加锁实现时,需要满足List必须使得线程可见(voliate并不具备使对象中的属性内容线程可见)
//2.使用sync需要结合wait和notify实现,需要保证List线程可见
//3.使用CountDownLatch需要使用两个,否则无法保证size==5时,线程2先结束。需要保证List线程可见
//4.使用LockSupport实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class T07_LockSupport_WithoutSleep {
List lists = new ArrayList();

public void add(Object o) {
lists.add(o);
}

public int size() {
return lists.size();
}

static Thread t1 = null, t2 = null;

public static void main(String[] args) {
T07_LockSupport_WithoutSleep c = new T07_LockSupport_WithoutSleep();

t1 = new Thread(() -> {
System.out.println("t1启动");
for (int i = 0; i < 10; i++) {
c.add(new Object());
System.out.println("add " + i);

if (c.size() == 5) {
LockSupport.unpark(t2);
LockSupport.park();
}
}
}, "t1");

t2 = new Thread(() -> {
LockSupport.park();
System.out.println("t2 结束");
LockSupport.unpark(t1);
}, "t2");

t2.start();
t1.start();
}
}
1
2
面试题:
写一个固定容量同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class MyContainer1<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;


public synchronized void put(T t) {
//想想为什么用while而不是用if?
//因为有可能 get--》put--》put--》,那么后面的put没有重复判断,依旧是满的
while(lists.size() == MAX) {
try {
this.wait(); //effective java
} catch (InterruptedException e) {
e.printStackTrace();
}
}

lists.add(t);
++count;
this.notifyAll(); //通知消费者线程进行消费
}

public synchronized T get() {
T t = null;
while(lists.size() == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = lists.removeFirst();
count --;
this.notifyAll(); //通知生产者进行生产,因为无法指定线程
return t;
}

public static void main(String[] args) {
MyContainer1<String> c = new MyContainer1<>();
//启动消费者线程
for(int i=0; i<10; i++) {
new Thread(()->{
for(int j=0; j<5; j++) System.out.println(c.get());
}, "c" + i).start();
}

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

//启动生产者线程
for(int i=0; i<2; i++) {
new Thread(()->{
for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
*
* 使用wait和notify/notifyAll来实现
*
* 使用Lock和Condition来实现
* 对比两种方式,Condition的方式可以更加精确的指定哪些线程被唤醒
*
*/

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyContainer2<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;

private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();//本质是个等待队列
private Condition consumer = lock.newCondition();//创建两个等待队列

public void put(T t) {
try {
lock.lock();
while(lists.size() == MAX) { //想想为什么用while而不是用if?
producer.await();
}

lists.add(t);
++count;
consumer.signalAll(); //通知消费者线程进行消费,精确指定消费者和生产者等待队列
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public T get() {
T t = null;
try {
lock.lock();
while(lists.size() == 0) {
consumer.await();
}
t = lists.removeFirst();
count --;
producer.signalAll(); //通知生产者进行生产
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}

public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
//启动消费者线程
for(int i=0; i<10; i++) {
new Thread(()->{
for(int j=0; j<5; j++) System.out.println(c.get());
}, "c" + i).start();
}

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

//启动生产者线程
for(int i=0; i<2; i++) {
new Thread(()->{
for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}

AQS源码

AbstractQueuedSynchronizer是一个为实现阻塞锁和相关同步器提供的一个框架,他是依赖于先进先出的一个等待队列,依靠单个原子int值来表示状态,通过占用和释放方法,改变状态值。

juc

1
2
AQS使用一个volatile的int类型的成员变量来表示同步状态
通过内置的 FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成 一个Node节点来实现锁的分配,通过CAS完成对State值的修改。

内部结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

/**
* CLH队列 双向队列,存储线程信息
* 1.通过自旋等待
* 2.通过state变量判断是否阻塞
* 3.从尾部入队,从头部出队
*/
static final class Node {...}

/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
//CLH的头节点
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
//CLH的尾节点
private transient volatile Node tail;

/**
* AQS的同步状态State成员变量, 0表示可以占用,大于1表示需要等待
*/
private volatile int state;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//AQS的内部类
static final class Node {

//共享
static final Node SHARED = new Node();
//独占
static final Node EXCLUSIVE = null;

//线程被取消了
static final int CANCELLED = 1;

//后继线程需要唤醒
static final int SIGNAL = -1;

//等待Condition唤醒
static final int CONDITION = -2;

//共享式同步状态获取会无条件地传播下去
static final int PROPAGATE = -3;

//Node的等待状态waitState成员变量,初始为0,状态是上面的几种
volatile int waitStatus;

//上一个节点
volatile Node prev;

//下一个节点
volatile Node next;

/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

}

juc

AQS同步队列的基本结构:

juc

juc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReentrantLock implements Lock, java.io.Serializable {

private final Sync sync;

//默认是使用非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

//最外层的 ReentrantLock.lock()
public void lock() {
sync.lock();
}

public void unlock() {
sync.release(1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//ReentrantLock的内部抽象类,AQS的实现类
abstract static class Sync extends AbstractQueuedSynchronizer {

//交给子类实现
abstract void lock();

//NonfairSync默认调用该方法
final boolean nonfairTryAcquire(int acquires) {
//拿到当前线程
final Thread current = Thread.currentThread();
//获取当前状态
int c = getState();
//双重检查,防止进来该方法后锁整好被释放。
//如果没人获得线程,通过CAS对state值做变更,获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果得到锁的线程和当前线程一致,则增加获取锁的次数,重入
else if (current == getExclusiveOwnerThread()) {
//重入次数++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//两者都不满足,表示当前锁有人持有,返回尝试获取锁失败
return false;
}

//释放锁
protected final boolean tryRelease(int releases) {
//当前重入次数 - 1,若等于0,则表示可以释放锁
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//释放锁
if (c == 0) {
free = true;
//设置线程持有者 = null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//Sync的实现类【非公平锁】
static final class NonfairSync extends Sync {
final void lock() {
//如果state是0,则直接获得锁
if (compareAndSetState(0, 1))
//设置当前线程独占这把锁
setExclusiveOwnerThread(Thread.currentThread());
else
//否则根据是否公平锁进行获取锁
//调用AbstractQueuedSynchronizer的acquire方法
//acquire方法会调用tryAcquire方法
acquire(1);
}

//在acquire方法中会被触发调用,功能是【尝试获取锁】
protected final boolean tryAcquire(int acquires) {
//调用父类实现的功能
return nonfairTryAcquire(acquires);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//Sync的实现类【公平锁】
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

//实现Sync的加锁逻辑
final void lock() {
//acquire方法会调用tryAcquire方法
acquire(1);
}

//在acquire方法中会被触发调用,功能是【尝试获取锁】
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//和非公平唯一的代码区别:!hasQueuedPredecessors()
//hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:!hasQueuedPredecessors()

1
2
3
4
5
6
7
8
//公平锁加锁时判断等待队列中是否存在有效节点的方法          
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

juc

加锁

1
2
3
4
5
6
7
8
//AbstractQueuedSynchronizer类实现的方法
//加锁方法
public final void acquire(int arg) {
//当tryAcquire失败时,将会调用addWaiter添加到队列,NODE类型是排它锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//AbstractQueuedSynchronizer类实现的方法
//添加节点进等待队列
private Node addWaiter(Node mode) {
//创建当前线程的节点Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果尾节点不为空
if (pred != null) {
//将新节点的pre指向tail
node.prev = pred;
//使用CAS设置最后一个节点,设置成功则原末尾节点的next指向新的节点
//它只对链表中的最后一个节点使用CAS自旋方式添加新节点保证安全性
//AQS的核心,相比Sync,提高了加锁的效率
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//添加节点进等待队列
enq(node);
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//AbstractQueuedSynchronizer类实现的方法
//添加CLH队列中
private Node enq(final Node node) {
//死循环,创建傀儡节点、并且把node节点添加到链表中
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//懒加载初始化头节点,若尾节点为空,则创建一个空节点做头尾节点!!
//双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。
//真正的第一个有数据的节点,是从第二个节点开始的。
if (compareAndSetHead(new Node()))
tail = head;
} else {
//将node节点的前节点指向tail
node.prev = t;
//CAS设置tail的值为node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//AbstractQueuedSynchronizer类实现的方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//死循环,要么拿到锁为止,要么取消
for (;;) {
//返回node的pre节点
final Node p = node.predecessor();
//如果前节点刚好等于头节点,则再尝试一次获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//根据他的前节点状态判断 【循环抢锁/park阻塞】
//parkAndCheckInterrupt将当前线程Park,当前节点在【该处被挂起】
//当有某个线程将它【唤醒后】,该线程会【接着死循环尝试获取锁】
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//如果failed = true,取消排队
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//AbstractQueuedSynchronizer类实现的方法
//根据他的前节点状态判断 【循环抢锁/park阻塞】
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//Node节点在创建时默认的waitStatus == 0;
//获取前一个节点的waitStatus
int ws = pred.waitStatus;
//如果前驱节点的waitStatus = Node.SINGAL, 表示它的后继节点需要被唤醒
if (ws == Node.SIGNAL)
//讲继续调用 parkAndCheckInterrupt,当前线程会被Park
return true;
//当waitStatus处于CANCELLED状态时
if (ws > 0) {
//循环判断前驱节点的前驱节点是否 也是CANCELLED状态
//忽略该状态的节点,重新将node连接到队列中
//也就是循环为node寻找一个非CANCELLED状态的节点作为前驱节点来判断
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将当前节点的前驱结点设置为SINGAL状态,用于后续唤醒操作
//程序第一次执行到这里返回 false,还会进行外层第二次循环,最终返回需要Park
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1
2
3
4
5
6
//AbstractQueuedSynchronizer类实现的方法
//将当前线程挂起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
1
2
3
4
5
6
7
8
9
AQS acquire的三条流程:
1.调用tryAcquire
-> 交由子类 FairSync/NonfairSync实现
2.调用addWaiter
-> enq 入队操作
3.调用acquireQueued
-> shouldParkAfterFailedAcquire
-> compareAndSetWaitStatus
-> parkAndCheckInterrupt

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//AbstractQueuedSynchronizer类实现的方法
//释放锁,当调用unLock时,会被Sync类调用
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
//获取头节点
Node h = head;
//头节点waitStatus != 0
if (h != null && h.waitStatus != 0)
//唤醒后续的线程
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//AbstractQueuedSynchronizer类实现的方法
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
//重新将node的waitStatus设置成0
compareAndSetWaitStatus(node, ws, 0);

//获取node的后继节点
Node s = node.next;
//如果它的后继节点为空或者处于CANCELLED状态
if (s == null || s.waitStatus > 0) {
s = null;
//从尾部递归找一个waitStatus < 0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果节点不为空,那么它必然是waitStatus < 0的,这时候调用unpark唤醒该线程
if (s != null)
LockSupport.unpark(s.thread);
}

AQS的属性的State,它的值有几种?

3个状态:没占用是0,占用了是1,大于1是可重入锁

如果AB两个线程进来了以后,请问这个总共有多少个Node节点?

若当前锁是在被持有的情况下的话,答案是3个

VarHandle

JDK1.9之后有的一个VarHandle,可以用来指针指向普通对象中的地址,代替它执行原子性的操作,如CAS、getAndAdd等等,和反射的区别是,它效率比反射快很多,可以理解它是直接操作二进制码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class T01_HelloVarHandle {

int x = 8;
private static VarHandle handle; //用于指向对象的某个属性

static {
try {
//指向当前T01_HelloVarHandle类对象的x属性的int类型
handle = MethodHandles.lookup().findVarHandle(T01_HelloVarHandle.class, "x", int.class);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}

AQS效率高的核心:使用CAS操作链表中的头/尾节点,而不是加锁整张链表,所以效率高。

1
2
3
4
5
6
7
ReentrantLock.lock()
->sync.lock()
->NonfairSync.lock()
->AbstractQueuedSynchronizer.acquire()
->NonfairSync.tryAcquire()
->AbstractQueuedSynchronizer.addWaiter()
->AbstractQueuedSynchronizer.acquireQueued()

强软弱虚

ThreadLocal可以用于Spring的声明式事务中,保证在同一个线程中,获取到的数据源都是同一个。

1
2
3
4
5
6
7
8
9
10
11
12
//ThreadLocal.set(T)
public void set(T value) {
//获取当前线程
Thread t = Thread.currentThread();
//获取当前线程中的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
if (map != null)
//《当前ThreadLocal,value》
map.set(this, value);
else
createMap(t, value);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static class ThreadLocalMap {

/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}

子线程怎么获得父线程的ThreadLocal里的值?

源码请参考:InheritableThreadLocal详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BaseTest {

//使用ThreadLocal的子类:InheritableThreadLocal 可以将值传递到子线程中
public static final InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
public static final ThreadLocal<String> threadLocal = new ThreadLocal<>();

public static void main(String[] args) {
inheritableThreadLocal.set("Inheritable hello");
threadLocal.set("hello");
new Thread(()->{
System.out.println(String.format("子线程可继承值:%s",inheritableThreadLocal.get()));
System.out.println(String.format("子线程值:%s",threadLocal.get()));
new Thread(()->{
System.out.println(String.format("孙子线程可继承值:%s",inheritableThreadLocal.get()));
System.out.println(String.format("孙子线程值:%s",threadLocal.get()));
}).start();

}).start();
}
1
2
3
4
5
输出:
子线程可继承值:inheritableThreadLocal hello
子线程值:null
孙线程可继承值:inheritableThreadLocal hello
孙线程值:null

阻塞队列

BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

抛出异常 特殊值 阻塞 超时
入队列 add(e) offer(e) put(e) offer(e, time, unit)
出队列 remove() poll() take() poll(time, unit)
peek()
1
2
3
4
5
6
7
8
9
10
11
12
add(E):
该API是由Collection接口提供的,AbstractQueue类实现,如果队列满了添加失败会抛异常
offer(E):
由Queue接口提供,如果队列满了会返回false,否则true
put(E):
放元素进队列,如果队列满了会进行阻塞,直到有人从队列中取走元素
poll():
从队列中取出一个元素,并且从队列里移除该元素,队列为空时返回null
take():
从队列里取元素,如果队列为空会进行阻塞,直到有人放了元素进队列
peek():
从队列中获取元素,但是不把该元素从队列里移除

Queue和List的区别:Queue多了很多对线程操作友好的API,并且Queue主要用于线程之间的通信,而List用于存储。

有界队列

ArrayBlockingQueue :基于数组实现的阻塞队列

LinkedBlockingQueue :不设置大小时默认长度是Integer.MAX_VALUE,内部是基于链表实现的。

SynchronousQueue :内部容量为零,用于两个线程交换数据用。

无界队列

ConcurrentLinkedQueue :无锁队列,采用CAS操作,具有较高吞吐量

PriorityBlockingQueue :具有优先级的阻塞队列,优先级高的先弹出

DelayedQueue :延时队列,内部实现其实是采用带时间的优先队列

LinkedTransferQueue :线程间数据交换的利器,是一个由链表结构组成的无界阻塞TransferQueue队列。多了tryTransfer和transfer方法。transfer进去后将一直阻塞到有人take走,与SynchronousQueue 的不同是它内部有容量大小。 详细分析

无界队列的put 操作永远都不会阻塞,空间限制来源于系统资源的限制,底层都使用CAS无锁编程。

区别

相同点:

LinkedBlockingQueueArrayBlockingQueue都是可阻塞的队列,内部都是使用ReentrantLockCondition来保证生产和消费的同步;

当队列为空,消费者线程被阻塞;当队列装满,生产者线程被阻塞;使用Condition的方法来同步和通信:await()signal()

不同点:

ArrayBlockingQueue 实现简单,表现稳定,生产者和消费者使用同一个锁,通常性能不如后者。

LinkedBlockingQueue 生产者和消费者两把锁是分开的,所以竞争会小一些。

ArrayBlockingQueue 内部维护了一个数组final Object[] items存储数据,在生产和消费的时候,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例。

LinkedBlockingQueue内部维护的是一个链表结构,在生产和消费的时候,需要创建Node对象进行插入或移除,大批量数据的系统中,其对于GC的压力会比较大。

LinkedBlockingQueue 的构造方法默认使用Integer.MAX_VALUE长度,而ArrayBlockingQueue的数组长度需要在创建时指定。

双端队列

juc

1
2
3
4
5
6
普通队列(一端进另一端出):
Queue queue = new LinkedList() 或 Deque deque = new LinkedList()
双端队列(两端都可进出)
Deque deque = new LinkedList()
堆栈
Deque deque = new LinkedList()

Java堆栈Stack类已经过时,Java官方推荐使用Deque替代Stack使用。Deque堆栈操作方法:push()、pop()、peek()。

Deque 继承自 Queue,直接实现了它的有 LinkedList, ArayDeque, ConcurrentLinkedDeque 等。
Deque 支持容量受限的双端队列,也支持大小不固定的。一般双端队列大小不确定。
Deque 接口定义了一些从头部和尾部访问元素的方法。比如分别在头部、尾部进行插入、删除、获取元素。

抛出异常 特殊值 抛出异常 特殊值
插入 addFirst(e) offerFirst(e) addLast(e) offerLast(e)
删除 removeFirst() pollFirst() removeLast() pollLast()
检查 getFirst() peekFirst() getLast() peekLast()

Deque接口扩展(继承)了 Queue 接口。在将双端队列用作队列时,将得到 FIFO(先进先出)行为。将元素添加到双端队列的末尾,从双端队列的开头移除元素。从 Queue 接口继承的方法完全等效于 Deque 方法,如下表所示:

Queue方法 Deque方法
add(e) addLast(e)
offer(e) offerLast(e)
remove() removeFirst()
poll() pollFirst()
element() getFirst()
peek() peekFirst()

双端队列也可用作 LIFO(后进先出)堆栈。应优先使用此接口而不是遗留 Stack 类。在将双端队列用作堆栈时,元素被推入双端队列的开头并从双端队列开头弹出。堆栈方法完全等效于 Deque 方法,如下表所示:

Stack方法 Deque方法
push(e) addFirst(e)
pop() removeFirst()
peek() peekFirst()

线程池

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public interface Future<V> {

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();

/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
1
2
3
4
5
6
7
8
9
10
11
public class FutureTask<V> implements RunnableFuture<V> {

}

public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(()->priceOfTM());
CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(()->priceOfTB());
CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(()->priceOfJD());
//阻塞获取所有结果,管理多个fefure的结果
CompletableFuture.allOf(futureTM, futureTB, futureJD).join();



//多线程执行完毕后的后续操作
CompletableFuture.supplyAsync(()->priceOfTM())
.thenApply(String::valueOf)
.thenApply(str-> "price " + str)
.thenAccept(System.out::println);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Test {
volatile static String s = "";

public static void main(String[] args) throws IOException, InterruptedException {
CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("===线程一 3秒休眠完毕===");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "world";
}).thenApply(Test::sum);

CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {
try {

Thread.sleep(1000);
System.out.println("===线程二 1秒休眠完毕===");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "hello ";
}).thenApply(Test::sum);

//调用join之前,不阻塞当前线程
System.out.println("非阻塞前的 s = " + s);

//调用join方法,阻塞等待两个子线程的返回结果
CompletableFuture.allOf(c1, c2).join();
System.out.println("执行完毕....");
System.in.read();
}

public static String sum(String st) {
System.out.println("即将拼接的字符串:" + st);
s += st;
System.out.println("拼接后的 s = " + s);
return s;
}
}
1
2
3
4
5
6
7
8
9
输出结果:
非阻塞前的 s =
===线程二 1秒休眠完毕===
即将拼接的字符串:hello
拼接后的 s = hello
===线程一 3秒休眠完毕===
即将拼接的字符串:world
拼接后的 s = hello world
执行完毕....

CompletableFuture的优势在于能够添加子线程返回结果的后续操作,比如多线程运行的情况下,需要对每个线程返回的数据进行分析计算时,可以每个线程返回结果集之后自动调用分析计算方法,而不是需要等到所有线程都返回了结果后再逐个去计算。

细节分析

ThreadPoolExecutor

1
2
3
4
5
ThreadPoolExecutor
ForkJoinPool:
分解汇总的任务
用很少的线程可以执行很多的任务(子任务)TPE做不到先执行子任务
CPU密集型
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,			
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

corePoolSize:核心线程数,核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。

核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。

maximumPoolSize:一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接执行,如果没有则会缓存到工作队列中,如果工作队列满了,则会创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建新线程,当线程池中的线程数达到maximunPoolSize(最大线程池线程数量)时达到阈值。

keepAliveTime:一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁

unit:空间线程存活时间单位

workQueue :工作队列

新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:

①ArrayBlockingQueue

基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。

②LinkedBlockingQuene

基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

③SynchronousQuene

一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

④PriorityBlockingQueue

具有优先级的无界阻塞队列,优先级通过参数Comparator实现

threadFactory :线程工厂

创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等

handler :拒绝策略

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:

①CallerRunsPolicy

该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。

1
2
3
4
5
6
7
8
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

②AbortPolicy

该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。

1
2
3
4
5
6
7
8
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

③DiscardPolicy

该策略下,直接丢弃任务,什么都不做。

1
2
3
4
5
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

④DiscardOldestPolicy

该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列。

1
2
3
4
5
6
7
8
9
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
1
2
3
4
5
6
//案例
ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

Executors

SingleThreadExecutor

为什么有单线程池?

任务队列、线程池生命周期

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

使用LinkedBlockingQueue,长度是Integer.MAX_VALUE,会堆满OOM。

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

使用SynchronousQueue等有线程池接收到任务之后,才返回,maximumPoolSize=Integer.MAX_VALUE,线程太多CPU切换,瘫痪。

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

使用LinkedBlockingQueue,长度是Integer.MAX_VALUE,会堆满OOM。

newScheduledThreadPool

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

使用DelayedWorkQueue延时队列处理任务, maximumPoolSize=Integer.MAX_VALUE,线程太多CPU切换,瘫痪。

ThreadPoolExecutor源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
// 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
// 8. `runStateLessThan()`,线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 9. `runStateAtLeast()`,线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
//线程池工作队列
private final BlockingQueue<Runnable> workQueue;

/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
//线程池内部的Lock锁,用于Workers集合的变更加锁
private final ReentrantLock mainLock = new ReentrantLock();

/**
* Set containing all worker threads in pool. Accessed only when
*/
//线程池工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// worker数量比核心线程数小,直接创建worker执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// worker数量超过核心线程数,任务直接进入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
if (! isRunning(recheck) && remove(command))
reject(command);
// 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
else if (workerCountOf(recheck) == 0)
//给线程池至少创建一个线程(非核心线程)
addWorker(null, false);
}
// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
// 这儿有3点需要注意:
// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
// 2. addWorker第2个参数表示是否创建核心线程
// 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
else if (!addWorker(command, false))
reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
* Methods for creating, running and cleaning up after workers
*/

/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/

//firstTask:添加的线程任务
//core:是否使用核心线程进行启动该任务(当前线程数小于核心线程数时为true)
private boolean addWorker(Runnable firstTask, boolean core) {

//双重死循环的目的是判断线程池的状态,若是当前线程池可以创建线程则通过 CAS 操作增加线程数;
retry:
for (;;) {

int c = ctl.get();
//获取当前线程池的状态(32位的前3位)
int rs = runStateOf(c);

// Check if queue empty only if necessary.
//if中的条件等价于下边描述:
//(rs > SHUTDOWN) ||
//(rs == SHUTDOWN || firstTask != null) ||
//(rs == SHUTDOWN || workQueue.isEmpty())
//1. 线程池状态>SHUTDOWN 直接返回false
//2. 线程池状态==SHUTDOWN且first不为null,直接返回false
//3. 线程池状态==SHUTDOWN且工作且任务队列为空,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;


//内部自旋 获取创建线程令牌的过程
for (;;) {
//获取线程池内的工作线程数
int wc = workerCountOf(c);
//当前线程池内的工作线程数已达最大值,无法添加进工作队列
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// compareAndIncrementWorkerCount(c) 可能会有多个线程在申请,需要以CAS的方式进行
//条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌。
//条件失败:说明可能有其它线程,修改过ctl这个值了。
//可能发生过什么事?
//1.其它线程execute() 申请过令牌了,在这之前。导致CAS失败
//2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生变化了,咱们知道 ctl 高3位表示状态 状态改变后,cas也会失败。

if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//重新获取线程池状态,变化则继续执行外层循环,否则继续执行内层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}


//表示创建的worker是否已经启动,false未启动 true启动
boolean workerStarted = false;
//表示创建的worker是否添加到池子中了 默认false 未添加 true是添加。
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;

//为什么要做 t != null 这个判断?
//为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现。
//万一哪个 小哥哥 脑子一热,有bug,创建出来的线程 是null、、
//Doug lea考虑的比较全面。肯定会防止他自己的程序报空指针,所以这里一定要做!
if (t != null) {
//worker的添加必须得是串行的,因此需要加锁
final ReentrantLock mainLock = this.mainLock;
//持有全局锁,可能会阻塞,直到获取成功为止,同一时刻 操纵 线程池内部相关的操作,都必须持锁。
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

//条件一:rs < SHUTDOWN 成立:最正常状态,当前线程池为RUNNING状态.
//条件二:前置条件:当前线程池状态不是RUNNING状态。
//(rs == SHUTDOWN && firstTask == null) 当前状态为SHUTDOWN状态且firstTask为空。其实判断的就是SHUTDOWN状态下的特殊情况,
//只不过这里不再判断队列是否为空了
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {

//t.isAlive() 当线程start后,线程isAlive会返回true。
//防止脑子发热的程序员,ThreadFactory创建线程返回给外部之前,将线程start了
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
//条件成立:说明当前线程数量是一个新高。更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
//表示线程已经追加进线程池中了。
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//条件成立:说明 添加worker成功
//条件失败:说明线程池在lock之前,线程池状态发生了变化,导致添加失败。
if (workerAdded) {
//成功后,则将创建的worker启动,线程启动。
t.start();
//启动标记设置为true
workerStarted = true;
}
}
} finally {
//失败时做什么清理工作?
//1.释放令牌
//2.将当前worker清理出workers集合
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//Worker本身是一个Runnable,并且继承了AQS,自身就是一把锁,防止多个线程同时争抢同一个Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}

// 省略代码...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 调用unlock()是为了让外部可以中断
w.unlock(); // allow interrupts
// 这个变量用于判断是否进入过自旋(while循环)
boolean completedAbruptly = true;
try {
// 这儿是自旋
// 1. 如果firstTask不为null,则执行firstTask;
// 2. 如果firstTask为null,则调用getTask()从队列获取任务。
// 3. while循环从线程池的工作队列workQueue中获取任务处理
// 4. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
while (task != null || (task = getTask()) != null) {
// 这儿对worker进行加锁,是为了达到下面的目的
// 1. 降低锁范围,提升性能
// 2. 保证每个worker执行的任务是串行的
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止,则对当前线程进行中断操作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
// 这两个方法在当前类里面为空实现。
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 帮助gc
task = null;
// 已完成任务数加一
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 自旋操作被退出,说明线程池正在结束
processWorkerExit(w, completedAbruptly);
}
}

WorkStealingPool

juc

线程池里的每个线程都有自己的工作队列,当其他线程没有任务执行时,则会去其他线程偷取它的工作队列里的任务进行执行。

1
2
3
4
5
6
7
//Executors.newWorkStealingPool();
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
1
2
3
//没有返回值的任务
public abstract class RecursiveAction extends ForkJoinTask<Void> {
}
1
2
3
//有返回值的任务
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class T12_ForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random r = new Random();

static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);
}

System.out.println("---" + Arrays.stream(nums).sum()); //stream api
}


static class AddTask extends RecursiveAction {

int start, end;

AddTask(int s, int e) {
start = s;
end = e;
}

@Override
protected void compute() {

if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
System.out.println("from:" + start + " to:" + end + " = " + sum);
} else {

int middle = start + (end-start)/2;

AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
subTask1.fork();
subTask2.fork();
}


}

}


static class AddTaskRet extends RecursiveTask<Long> {

private static final long serialVersionUID = 1L;
int start, end;

AddTaskRet(int s, int e) {
start = s;
end = e;
}

@Override
protected Long compute() {

if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}

int middle = start + (end-start)/2;

AddTaskRet subTask1 = new AddTaskRet(start, middle);
AddTaskRet subTask2 = new AddTaskRet(middle, end);
subTask1.fork();
subTask2.fork();

return subTask1.join() + subTask2.join();
}

}

public static void main(String[] args) throws IOException {
/*ForkJoinPool fjp = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
fjp.execute(task);*/

T12_ForkJoinPool temp = new T12_ForkJoinPool();

ForkJoinPool fjp = new ForkJoinPool();
AddTaskRet task = new AddTaskRet(0, nums.length);
fjp.execute(task);
long result = task.join();
System.out.println(result);

//System.in.read();
}
}

Disruptor

底层使用环状数组,采用复用覆盖消息的方式,并且默认为数组中的数据创建对象,通过覆盖对象中的属性值达到减少GC频率的目的。

1
2
3
4
5
6
7
8
9
10
11
public class LongEvent {
private long value;

public long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Main {

public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println(event);
}

public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.setValue(buffer.getLong(0));
}

public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;

// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

// Connect the handler
disruptor.handleEventsWith(Main::handleEvent);

// Start the Disruptor, starts all threads running
disruptor.start();

// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(Main::translate, bb);
ThrTead.sleep(1000);
}
}
}

最后更新: 2021年03月15日 12:29

原始链接: https://midkuro.gitee.io/2020/10/29/juc-base/

× 请我吃糖~
打赏二维码