GeekIBLi

深入学习并发编程(图灵-杨过)

2022-01-04

2. volatile关键字

2.1 volatile的作用

Volatile 只能修饰成员变量,不能修饰局部变量。

1、及时可见性

2、指令重排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Jmm04_CodeAtomic {
private volatile static int counter = 0;
static Object object = new Object();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(()->{
for (int j = 0; j < 1000; j++) {
synchronized (object){
counter++;//分三步- 读,自加,写回
}
}
});
thread.start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(counter);
}
}

⚠️ volatile无法保证原子操作

2.2 volatile count++为什么会小于正确的结果?

count++ 不是原子操作!count = count + 1;

  • 读count
  • 计算count + 1
  • 重新赋值count

s多个线程下可能会出现少加的情况。

2.2.1 这个数据是被丢失了呢 还是被覆盖了呢?

mesi协议

2.2.2 如何保证count++正确呢?

同步锁 synchronized

2.3 什么是指令重排序?

在保证结果正确性的前提下,指令从内存中加载,重排序之后,可以减少内存数据加载的次数。

编译器重排 指令级重排序 执行器重排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Slf4j
public class Jmm05_CodeReorder {
private static int x = 0, y = 0;
private static int a = 0, b = 0;

public static void main(String[] args) throws InterruptedException {
int i = 0;
for (;;){
i++;
x = 0; y = 0;
a = 0; b = 0;
Thread t1 = new Thread(new Runnable() {
public void run() {
shortWait(10000);
a = 1;
x = b;
UnsafeInstance.reflectGetUnsafe().fullFence();
///
//
//
}
});

Thread t2 = new Thread(new Runnable() {
public void run() {
b = 1;
UnsafeInstance.reflectGetUnsafe().fullFence();
y = a;
}
});

t1.start();
t2.start();
t1.join();
t2.join();

String result = "第" + i + "次 (" + x + "," + y + ")";
if(x == 0 && y == 0) {
System.out.println(result);
break;
} else {
log.info(result);
}
}
}

/**
* 等待一段时间,时间单位纳秒
* @param interval
*/
public static void shortWait(long interval){
long start = System.nanoTime();
long end;
do{
end = System.nanoTime();
}while(start + interval >= end);
}
}

以上不考虑指令重拍的情况下有几种结果呢?

1
2
3
4
x = 1, y = 0;
x = 0, y = 1;
x = 1, y = 1;
x = 0, y = 0; volatile禁止指令重排序,不会出现这种情况!

2.4 禁止指令重排序的实现原理?

2.4.1 内存屏障

2.5 指令重排有哪些现实中的例子

2.5.1 DCL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Singleton {

/**
* 查看汇编指令
* -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -Xcomp
*/
private volatile static Singleton myinstance;

/**
* 双重锁机制保证单例安全
* @return
*/
public static Singleton getInstance() {
if (myinstance == null) {
synchronized (Singleton.class) {
if (myinstance == null) {
myinstance = new Singleton();
}
}
}
return myinstance;
}

public static void main(String[] args) {
Singleton.getInstance();
}
}

对应字节码如下:

1
2
3
4
5
6
L8
LINENUMBER 26 L8
NEW com/yg/edu/jmm/dcl/Singleton
DUP
INVOKESPECIAL com/yg/edu/jmm/dcl/Singleton.<init> ()V
PUTSTATIC com/yg/edu/jmm/dcl/Singleton.myinstance : Lcom/yg/edu/jmm/dcl/Singleton;

上述代码一个经典的单例的双重检测的代码,这段代码在单线程环境下并没有什么问题,但如果在多线程环境下就可以出现线程安全问题。原因在于某一个线程执行到第一次检测,读取到的instance不为null时,instance的引用对象可能没有完成初始化。

因为instance = new DoubleCheckLock();可以分为以下3步完成(伪代码)

1
2
3
memory = allocate();//1.分配对象内存空间 
instance(memory);//2.初始化对象
instance = memory;//3.设置instance指向刚分配的内存地址,此时instance!=null

由于步骤1和步骤2间可能会重排序,如下:

1
2
memory=allocate();//1.分配对象内存空间 
instance=memory;//3.设置instance指向刚分配的内存地址,此时instance!=null,但是对象还没有初始化完成! instance(memory);//2.初始化对象

由于步骤2和步骤3不存在数据依赖关系,而且无论重排前还是重排后程序的执行结果在单线程中并没有改变,因此这种重排优化是允许的。但是指令重排只会保证串行语义的执行的一致性(单线程),但并不会关心多线程间的语义一致性。所以当一条线程访问instance不为null时,由于instance实例未必已初始化完成,也就造成了线程安全问题。那么该如何解决呢,很简单,我们使用volatile禁止instance变量被执行指令重排优化即可。

//禁止指令重排优化 private volatile static DoubleCheckLock instance;

2.6 volatile内存语义的实现

举例来说,第二行最后一个单元格的意思是:在程序中,当第一个操作为普通变量的读或写时,如果第二个操作为volatile写,则编译器不能重排序这两个操作。

从上图可以看出:

    • 当第二个操作是volatile写时,不管第一个操作是什么,都不能重排序。这个规则确保volatile写之前的操作不会被编译器重排序到volatile写之后。
    • 当第一个操作是volatile读时,不管第二个操作是什么,都不能重排序。这个规则确保volatile读之后的操作不会被编译器重排序到volatile读之前。
    • 当第一个操作是volatile写,第二个操作是volatile读或写时,不能重排序。

为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎不可能。为此,JMM采取保守策略。下面是基于保守策略的JMM内存屏障插入策略。

    • ·在每个volatile写操作的前面插入一个StoreStore屏障。
    • ·在每个volatile写操作的后面插入一个StoreLoad屏障。
    • ·在每个volatile读操作的前面插入一个LoadLoad屏障。
    • ·在每个volatile读操作的后面插入一个LoadStore屏障。

上述内存屏障插入策略非常保守,但它可以保证在任意处理器平台,任意的程序中都能得到正确的volatile内存语义。

2.7 synchronized能否禁止指令重排序?

不能

2.8 如何在java代码中手动添加内存屏障?

1
2
3
4
5
6
7
8
9
10
11
12
public class UnsafeInstance {
public static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

使用的时候如下:

1
2
3
4
5
6
7
Thread t2 = new Thread(new Runnable() {
public void run() {
b = 1;
UnsafeInstance.reflectGetUnsafe().fullFence();
y = a;
}
});

3. MESI协议

3.1 java代码是如何执行的?

3.2 CPU是如何与内存交互的?

CPU访问内存是通过总线,而访问总线,必须先获取总线索,而lock前缀的执行,可以获取总线锁,阻塞其他CPU进行访问。这是最初的一种设计,这种方式的效率显然是很差的。

3.3 MESI协议工作流程?

3.4 什么是总线裁决?

多个cpu操作一个数据的时候,去对缓存行加锁的时候,需要总线来判断给那个cpu加锁。获取锁的缓存行变成m状态,其他的缓存行变成i状态。

3.5 缓存行是几级缓存的?

L1 Cache

3.6 一个缓存行64字节装不下数据会怎样?

升级成总线锁

3.7 缓存行上加锁会影响到其他的数据吗?

???

3.8 MESI协议不能对寄存器失效

已经加载到寄存器的指令不能失效,比如count++操作不能保证原子性

3.9 MESI 数据失效之后,怎么读正确的数据呢?

是实时去内存中读取数据吗? 不是的。

  • 获取到lock的数据修改之后,并不是直接把数据写到缓存行中,而是写到了store buffer中。
  • 获取lock的cpu在修改数据是,会把当前缓存行设置成m状态,同时发送一个消息到其他cpu
  • 其他没有获取到lock的缓存行中的数据就失效了,变成i状态,同时把失效的数据放到一个队列中
  • 当失效数据都放到缓冲队列之后,获取lock的cpu把store buffer中的数据刷到缓存行中。最后在更新到内存中。
  • 在Cpu空闲的时候,将失效的数据在队列中清除,之前仅仅是把数据放到失效队列中,缓存行中的数据其实还在

3.10 happens-before原则

4、Synchronized关键字

4.1 synchronized 1.6之前和之后有什么区别

  • 偏向锁只针对有一个线程加锁的情况
  • 轻量级锁针对有少数线程竞争,但是竞争不强烈(如何定义不强烈? 锁占用时间短,线程可交替执行)
  • 重量级锁 依赖管程 依靠操作系统底层的互斥量Mutex, 由操作系统维护,涉及到CPU用户态和内核态的切换,比较重

4.2 什么是自旋锁

1
2
3
synchronized(lock){
do....
}

当多个线程竞争锁资源的时候,后到的线程自旋等待正在执行的线程释放锁资源,然后自己去竞争,自旋的过程中,一直占用CPU。

自旋锁使用于同步代码块里面执行逻辑很简单或者比较快的场景。这样另一个进程可以很快获得锁。

自旋锁是处于性能的考虑。避免进程上下文切换,等待线程阻塞和唤醒的性能开销。

自旋锁成功之后升级为轻量级锁,如果自旋次数够了依旧没有获取到锁,便升级成为重量级锁。

4.3 锁升级的过程是否可逆?

不可逆

4.4 synchronized如何使用?

  • 普通方法
    • 锁的是当前的对象,凡是这个实例对象相关的方法都互斥
    • 即便这个类存在static的同步方法,不和这个实例对象相关的两个线程,不会冲突
  • 静态方法
    • 锁范围是当前类实例
    • 注意所的范围才好弄清楚是否冲突
    • 普通同步方法和静态同步方法不冲突,因为不是所的一个实例
  • 方法内部同步块
    • 锁的范围最小
    • 锁实例一般是成员对象,不同成员对象的同步代码块执行不冲突

4.5 synchronized底层原理是怎样的?

synchronized内置锁是一种对象锁(锁的是对象而非引用),作用粒度是对象,可以用来实现对临界资源的同步互斥访问,是可重入的。

synchronized加锁的方式如上已经阐述。

synchronized是基于JVM内置锁实现,通过内部对象Monitor(监视器锁)实现,基于进入与退出Monitor对象实现方法与代码块同步,监视器锁的实现依赖底层操作系统的Mutex lock(互斥锁)实现,它是一个重量级锁性能较低。当然,JVM内置锁在1.5之后版本做了重大的优化,如锁粗化(Lock Coarsening)、锁消除(Lock Elimination)、轻量级锁(Lightweight Locking)、偏向锁(Biased Locking)、适应性自旋(Adaptive Spinning)等技术来减少锁操作的开销,,内置锁的并发性能已经基本与Lock持平。

synchronized关键字被编译成字节码后会被翻译成monitorenter 和 monitorexit 两条指令分别在同步块逻辑代码的起始位置与结束位置。

4.6什么是Monitor监视器锁

Monitor监视器锁

任何一个对象都有一个Monitor与之关联,当且一个Monitor被持有后,它将处于锁定状态。Synchronized在JVM里的实现都是 基于进入和退出Monitor对象来实现方法同步和代码块同步,虽然具体实现细节不一样,但是都可以通过成对的MonitorEnter和MonitorExit指令来实现。

  • monitorenter:每个对象都是一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:

    1. 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者;
    2. 如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1;
    3. 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权;
  • monitorexit:执行monitorexit的线程必须是objectref所对应的monitor的所有者。指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。

monitorexit,指令出现了两次,第1次为同步正常退出释放锁;第2次为发生异步退出释放锁

通过上面两段描述,我们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因

1
2
3
4
5
public class SynchronizedMethod {
public synchronized void method() {
System.out.println("Hello World!");
}
}

反编译如下:

从编译的结果来看,方法的同步并没有通过指令 monitorentermonitorexit 来完成(理论上其实也可以通过这两条指令来实现),不过相对于普通方法,其常量池中多了 ACC_SYNCHRONIZED 标示符。JVM就是根据该标示符来实现方法的同步的

当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。在方法执行期间,其他任何线程都无法再获得同一个monitor对象。

两种同步方式本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。两个指令的执行是JVM通过调用操作系统的互斥原语mutex来实现,被阻塞的线程会被挂起、等待重新调度,会导致“用户态和内核态”两个态之间来回切换,对性能有较大影响。

什么是monitor?

可以把它理解为 一个同步工具,也可以描述为 一种同步机制,它通常被 描述为一个对象。与一切皆对象一样,所有的Java对象是天生的Monitor,每一个Java对象都有成为Monitor的潜质,因为在Java的设计中 ,每一个Java对象自打娘胎里出来就带了一把看不见的锁,它叫做内部锁或者Monitor锁也就是通常说Synchronized的对象锁,MarkWord锁标识位为10,其中指针指向的是Monitor对象的起始地址。在Java虚拟机(HotSpot)中,Monitor是由ObjectMonitor实现的,其主要数据结构如下(位于HotSpot虚拟机源码ObjectMonitor.hpp文件,C++实现的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ObjectMonitor() {
_header = NULL;
_count = 0; // 记录个数
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL;
_WaitSet = NULL; // 处于wait状态的线程,会被加入到_WaitSet
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ; // 处于等待锁block状态的线程,会被加入到该列表
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}

ObjectMonitor中有两个队列,**_WaitSet 和 _EntryList,用来保存ObjectWaiter对象列表( 每个等待锁的线程都会被封装成ObjectWaiter对象 ),_owner指向持有ObjectMonitor对象的线程**,当多个线程同时访问一段同步代码时:

  1. 首先会进入 _EntryList 集合,当线程获取到对象的monitor后,进入 _Owner区域并把monitor中的owner变量设置为当前线程,同时monitor中的计数器count加1
  2. 若线程调用 wait() 方法,将释放当前持有的monitor,owner变量恢复为null,count自减1,同时该线程进入 WaitSet集合中等待被唤醒
  3. 若当前线程执行完毕,**也将释放monitor(锁)并复位count的值,以便其他线程进入获取monitor(锁)**;

同时,Monitor对象存在于每个Java对象的对象头Mark Word中(存储的指针的指向),Synchronized锁便是通过这种方式获取锁的,也是为什么Java中任意对象可以作为锁的原因,同时notify/notifyAll/wait等方法会使用到Monitor锁对象,所以必须在同步代码块中使用。监视器Monitor有两种同步方式:互斥与协作。多线程环境下线程之间如果需要共享数据,需要解决互斥访问数据的问题,监视器可以确保监视器上的数据在同一时刻只会有一个线程在访问

4.7 对象头中锁是如何标记的?

32位虚拟机的对象头如下👇

4.8 锁对象hashCode在各种状态下都存放在那里?

  • 无锁状态下hashcode存放在markword中
  • 偏向锁调用hashcode会升级成轻量级锁
  • 轻量级锁的hashCode存放在线程栈的Lock Record中
  • 重量级锁的hashCode存放在Monitor中

4.9 验证锁标志

项目中引入一下依赖

1
2
3
4
5
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.10</version>
</dependency>

打印锁对象的对象头,如下:

1
2
3
4
5
6
7
8
9
10
 public static void main(String[] args) throws InterruptedException {
// TimeUnit.SECONDS.sleep(5);
Object o = new Object();
System.out.println("a " + ClassLayout.parseInstance(o).toPrintable());
o.hashCode();
System.out.println("b " + ClassLayout.parseInstance(o).toPrintable());
synchronized (o){
System.out.println("c " + ClassLayout.parseInstance(o).toPrintable());
}
}

4.9 一开始要sleep5 秒有什么作用?

因为jvm在启动的时候,要初始化很多的数据,会涉及到很多对象处于偏向锁。

  • 如果一开始不sleep,第一次打印是无锁状态,第二次打印,是轻量级锁,因为此时进程中可能有很多偏向锁占用CPU,这里直接升级为轻量级锁
  • 一开始sleep,锁对象会是偏向锁的状态,然后只有一个线程竞争,第二次打印也还是偏向锁

4.10 什么是匿名偏向?

4.11 为什么偏向锁的对象调用hashCode方法之后,会升级为轻量级锁?

可能是因为轻量级锁的对象头markword中,没有地方存放偏向锁的标志和hashcode,而轻量级锁则由地方存储,在线程栈的Lock record中记录

4.12 锁升级过程是怎样的?

偏向锁

偏向锁是Java 6之后加入的新锁,它是一种针对加锁操作的优化手段,经过研究发现,在大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,因此为了减少同一线程获取锁(会涉及到一些CAS操作,耗时)的代价而引入偏向锁。\

偏向锁的核心思想是,如果一个线程获得了锁,那么锁就进入偏向模式,此时Mark Word 的结构也变为偏向锁结构,当这个线程再次请求锁时,无需再做任何同步操作,即获取锁的过程,这样就省去了大量有关锁申请的操作,从而也就提供程序的性能。所以,对于没有锁竞争的场合,偏向锁有很好的优化效果,毕竟极有可能连续多次是同一个线程申请相同的锁。

但是对于锁竞争比较激烈的场合,偏向锁就失效了,因为这样场合极有可能每次申请锁的线程都是不相同的,因此这种场合下不应该使用偏向锁,否则会得不偿失,需要注意的是,偏向锁失败后,并不会立即膨胀为重量级锁,而是先升级为轻量级锁。下面我们接着了解轻量级锁。

1
2
3
默认开启偏向锁
开启偏向锁:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0
关闭偏向锁:-XX:-UseBiasedLocking

轻量级锁

倘若偏向锁失败,虚拟机并不会立即升级为重量级锁,它还会尝试使用一种称为轻量级锁的优化手段(1.6之后加入的),此时Mark Word 的结构也变为轻量级锁的结构。轻量级锁能够提升程序性能的依据是“对绝大部分的锁,在整个同步周期内都不存在竞争”,注意这是经验数据。需要了解的是,轻量级锁所适应的场景是线程交替执行同步块的场合,如果存在同一时间访问同一锁的场合,就会导致轻量级锁膨胀为重量级锁。

自旋锁

轻量级锁失败后,虚拟机为了避免线程真实地在操作系统层面挂起,还会进行一项称为自旋锁的优化手段。这是基于在大多数情况下,线程持有锁的时间都不会太长,如果直接挂起操作系统层面的线程可能会得不偿失,毕竟操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,因此自旋锁会假设在不久将来,当前的线程可以获得锁,因此虚拟机会让当前想要获取锁的线程做几个空循环(这也是称为自旋的原因),一般不会太久,可能是50个循环或100循环,在经过若干次循环后,如果得到锁,就顺利进入临界区。如果还不能获得锁,那就会将线程在操作系统层面挂起,这就是自旋锁的优化方式,这种方式确实也是可以提升效率的。最后没办法也就只能升级为重量级锁了。

锁消除

消除锁是虚拟机另外一种锁的优化,这种优化更彻底,Java虚拟机在JIT编译时(可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译),通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间,如下StringBuffer的append是一个同步方法,但是在add方法中的StringBuffer属于一个局部变量,并且不会被其他线程所使用,因此StringBuffer不可能存在共享资源竞争的情景,JVM会自动将其锁消除。锁消除的依据是逃逸分析的数据支持。

锁消除,前提是java必须运行在server模式(server模式会比client模式作更多的优化),同时必须开启逃逸分析

:-XX:+DoEscapeAnalysis 开启逃逸分析

-XX:+EliminateLocks 表示开启锁消除。

锁膨胀

Java-锁消除和锁膨胀

4.13 什么是逃逸分析?

使用逃逸分析,编译器可以对代码做如下优化:

一、同步省略。如果一个对象被发现只能从一个线程被访问到,那么对于这个对象的操作可以不考虑同步。

二、将堆分配转化为栈分配。如果一个对象在子程序中被分配,要使指向该对象的指针永远不会逃逸,对象可能是栈分配的候选,而不是堆分配。

三、分离对象或标量(基本数据类型)替换。有的对象可能不需要作为一个连续的内存结构存在也可以被访问到,那么对象的部分(或全部)可以不存储在内存,而是存储在CPU寄存器中。

是不是所有的对象和数组都会在堆内存分配空间?

不一定

在Java代码运行时,通过JVM参数可指定是否开启逃逸分析,

-XX:+DoEscapeAnalysis : 表示开启逃逸分析

-XX:-DoEscapeAnalysis : 表示关闭逃逸分析。

从jdk 1.7开始已经默认开启逃逸分析,如需关闭,需要指定-XX:-DoEscapeAnalysis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class T0_ObjectStackAlloc {
/**
* 进行两种测试
* 关闭逃逸分析,同时调大堆空间,避免堆内GC的发生,如果有GC信息将会被打印出来
* VM运行参数:-Xmx4G -Xms4G -XX:-DoEscapeAnalysis -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError
*
* 开启逃逸分析
* VM运行参数:-Xmx4G -Xms4G -XX:+DoEscapeAnalysis -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError
*
* 执行main方法后
* jps 查看进程
* jmap -histo 进程ID
*
*/
public static void main(String[] args) {
long start = System.currentTimeMillis();
for (int i = 0; i < 500000; i++) {
alloc();
}
long end = System.currentTimeMillis();
//查看执行时间
System.out.println("cost-time " + (end - start) + " ms");
try {
Thread.sleep(100000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}

private static TulingStudent alloc() {
//Jit对编译时会对代码进行 逃逸分析
//并不是所有对象存放在堆区,有的一部分存在线程栈空间
TulingStudent student = new TulingStudent();
return student;
}

static class TulingStudent {
private String name;
private int age;
}
}

使用jmap查看对象创建情况

1
jmap -histo pid

逃逸分析可以节省堆空间,有利于GC

5. AQS框架Lock详解

5.1 Lock的核心点

  • 循环
  • CAS 多线程竞争锁
  • 队列 (公平和非公平)存储阻塞的线程们
  • 阻塞和唤醒

5.2 LockSupport.park() 和 object.notify() 有什么区别?

如果大量线程阻塞,每个线程都会有自己的线程栈,这样会占用大量的内存。可能会导致栈溢出。

LockSupport.unpark可以唤醒特定的线程,而object.notify是随机的唤醒

puck有参和无参有什么区别?

Puck无参数,阻塞一次

puck有参数,未被唤醒,一直阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) {
Thread t0 = new Thread(new Runnable() {
@Override
public void run() {
Thread current = Thread.currentThread();
log.info("{},开始执行!",current.getName());
for(;;){//spin 自旋
log.info("准备park住当前线程:{}....",current.getName());
LockSupport.park();
System.out.println(Thread.interrupted());
log.info("当前线程{}已经被唤醒....",current.getName());

}
}
},"t0");

t0.start();

try {
Thread.sleep(2000);
log.info("准备唤醒{}线程!",t0.getName());
LockSupport.unpark(t0);
Thread.sleep(2000);
t0.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

5.3 CAS是什么?

A: 内存中实际存储的值,B: 期望内存中的值, C: 修改之后的值

如果 A = B , 则修改;否则重新读区内存中的值,不断循环上面的过程。

其实这个是和JMM息息相关的。整个比较并交换的操作是原子操作

在java中是用到了Unsafe类下的方法。底层其实是用到了汇编 cmpxchg指令

java代码演绎cas原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Slf4j
public class Juc04_Thread_Cas {
/**
* 当前加锁状态,记录加锁的次数
*/
private volatile int state = 0;
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
private static Juc04_Thread_Cas cas = new Juc04_Thread_Cas();

public static void main(String[] args) {
new Thread(new Worker(),"t-0").start();
new Thread(new Worker(),"t-1").start();
new Thread(new Worker(),"t-2").start();
new Thread(new Worker(),"t-3").start();
new Thread(new Worker(),"t-4").start();
}

static class Worker implements Runnable{

@Override
public void run() {
log.info("请求:{}到达预定点,准备开始抢state:)",Thread.currentThread().getName());
try {
cyclicBarrier.await();
if(cas.compareAndSwapState(0,1)){
log.info("当前请求:{},抢到锁!",Thread.currentThread().getName());
}else{
log.info("当前请求:{},抢锁失败!",Thread.currentThread().getName());
}
} catch (InterruptedException|BrokenBarrierException e) {
e.printStackTrace();
}
}
}

/**
* 原子操作
* @param oldValue
* oldvalue:线程工作内存当中的值
* @param
* newValue:要替换的新值
* @return
*/
public final boolean compareAndSwapState(int oldValue,int newValue){
return unsafe.compareAndSwapInt(this,stateOffset,oldValue,newValue);
}
private static final Unsafe unsafe = UnsafeInstance.reflectGetUnsafe();
private static final long stateOffset;

static {
try {
stateOffset = unsafe.objectFieldOffset(Juc04_Thread_Cas.class.getDeclaredField("state"));
} catch (Exception e) {
throw new Error();
}
}
}

5.4 什么是公平锁什么是非公平锁

  • 公平锁 : 按照队列的顺序获取锁,新来的线程进入队列排队
  • 非公平锁 : 获取锁的时候,新来的线程也可以参与竞争锁

针对的是 新来的线程是否马上可以竞争锁资源,其实就是是否破坏了先来后到,先来先的的公平性。

5.5 在reentrantLock代码中如何体现?

默认非公平

5.6 如何判断那个线程获取了锁?

AQS的属性 exclusiceOwnerThread 指向当前获取锁的线程。

5.7 锁到底加到了哪里,加了多少次?

AQS 的属性 int status = 0; 0 表示没有加锁,>0 表示锁重入的次数。

5.8 队列是如何创建的?

CLH队列 : Node类型,本质是双向链表的结构 。 三个人名。

1
2
3
4
5
6
7
8
static class Node {
Node pre;
Node next;
Node head;
Node tail;
Thread thread; // 对线程的引用
int waitStatus; // 状态 信号量
}

waitStatus变量的状态:

  • Init = 0 初始状态
  • singal = -1 下一个结点可被唤醒
  • cancled = 1 可能发生了异常 比如终端或者其他因素,需要被废弃掉这样的结点
  • condition = -2
  • propagate(广播) = -3

5.9 公平锁加锁流程

  • 先判断status == 0
  • 判断队列是否为空 (head == tail)
  • cas修改status = 1 && exclusiveOwnerThread = curThread
  • 如果status != 0 有两种情况,判断 exclusiveOwnerThread == curThread ? 如果是curThread , status +1 , 如果不是,加入CLH队列

5.9.1 如果T0获取了锁,T1 T2…线程怎么办?

5.9.1.1 尝试加入CLH队列
1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 可能多线程进入
if (!hasQueuedPredecessors() &&
// cas只会有一个线程修改成功
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 只会有一个线程进入,也就是当前线程再次获取锁的时候,所以这里没有并发问题
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

这个方法可以被多个线程同时调用,性能会比较高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 队列不是空的时候走下面
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 队列是空的时候,走这个方法
enq(node);
return node;
}

队列是空的时候,第一次想队列中添加等待的结点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
  • 为什么循环等待呢? 要确保结点一定要添加到队列尾部
  • if (t == null) { // Must initialize 是什么逻辑?

第一次初始化的时候,头结点搞一个空的Node对象,然后下一次循环的时候把结点添加到尾部。

如果不循环,可能导致线程结点丢失,永远无法唤醒,但是内存空间中还存在该线程的堆栈信息。

5.9.1.2 加入队列后,该把线程阻塞了

执行到这的话,只是把线程添加到队列中了,但是显示还没有阻塞,下面就是去阻塞的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前一个结点
final Node p = node.predecessor();
// 如果前一个结点是头结点,并且当前线程获取成功了
// 因为可能在入队前的瞬间,头结点的线程释放锁了
if (p == head && tryAcquire(arg)) {
// 当前获取锁的线程的node是第一个,而且是空的node,这个enq方法呼应!
setHead(node);
p.next = null; // help GC 把之前的结点设置成null
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

补充: ⚠️ ⚠️ ⚠️

if (p == head && tryAcquire(arg))

  • 如果是公平锁,则一定可以tryAcquire 获取到锁
  • 如果是非公平锁,则 if 不一定为true
5.9.1.3 结点阻塞之前,还会再次尝试获取锁

(如果是第一个结点)

5.9.1.4 如果获取锁成功,设置成头结点

设置成头结点的时候,之前的头结点(肯定是个 “空结点”)断开,并且设置成null,方便GC. 然后把当前线程的Node结点设置成head结点,同时把Node设置成 “空结点”。 怎么设置的呢?

1
2
3
4
5
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
  • 头指针指向当前结点
  • 把线程设置成null,因为当前线程已经获取到锁了,这里没有必要还继续占着引用
  • 把当前结点的前指针断开(因为在enq方法里面,要阻塞的结点都是添加在tail,所以它的prev肯定是有的,这里要断开头结点)
5.9.1.5 如果没有获取成功,执行阻塞
  • 第一轮循环,shouldParkAfterFailedAcquire(p, node) 返回false ,修改head的waitStatus = singal = -1,下一个结点可以被唤醒

  • 第二次循环进行阻塞操作,shouldParkAfterFailedAcquire(p, node) 返回true , 执行parkAndCheckInterrupt()

    1
    2
    3
    4
    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    }

    阻塞当前线程。此时当前结点的waitStatus = 0. 和上面之前的操作一样了,等待这个线程执行的时候,又开始了上面的操作,设置head结点的waitStatus = signal = -1 等等。

5.9.2 被阻塞的线程什么时候会唤醒呢?

在获取锁的线程执行 unlock 的时候。

下面是AQS的模版方法,tryRelease在子类实现;

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
// 先减AQS的 state
if (tryRelease(arg)) {
Node h = head;
// h.waitStatus != 0 不能为0 在reentrantLock中,waitStatus = -1, 唤醒CLH队列下一个结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

下面是tryRelease在ReentrantLock方法中的实现:

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

如果state - 1 成功,则接下来执行唤醒操作。

AQS# unparkSuccessor唤醒方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

总结

  • head结点的状态肯定是 “空结点”, waitStatus 状态=-1 ,说明线程正在执行,当线程执行完成之后,在unlock的时候,再把waitStatus改成0
  • 为什么waitStatus的状态在unlock的时候要 ‘恢复’ 成 0 呢?
    • 因为在非公平锁的情况下,不一定是后续结点一定能获得锁
    • 而且node 的waitStatus的状态设置成-1是有固定方法固定的时候

5.10 什么是可重入锁

1
2
3
4
5
6
7
lock.lock()
...
lock.lock()
...
lock.unlock()
...
lock.unlock()

5.11 什么是中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
Thread thread = new Thread(() -> {
while (true) {
System.out.println("in thread 111" + Thread.currentThread().isInterrupted());
if (Thread.currentThread().isInterrupted()) {
System.out.println("响应中断");
break;
}
}
}, "thread1");

thread.start();
System.err.println("before interrupt " + thread.isInterrupted());
thread.interrupt();
System.err.println("after interrupt " + thread.isInterrupted());
}

中断的三个方法:

  • Thread.interrupted(); 中断线程并且清除中断标记
  • Thread.currentThread().interrupt(); 中断线程
  • Thread.currentThread().isInterrupted(); 判断中断标记,并不清除

用户程序自己响应中断,比直接调用stop方法要友好的多

如果获取锁的线程调用了wait方法会怎样?

5.12 waitStatus = cancel = 1 状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

当node是cancel状态的时候,说明当前线程已经没用了,但是之前呢,已经把node添加到了队列里面了。

分成3种情况

  • 第一种,如果结点是tail, 把当前结点去掉
  • 如果是head后面的第一个结点,那么,直接唤醒该结点后面的结点
  • 如果是队列中间,比如图中红色的位置,则去掉就行了

这里呢 还有一些细节,node的thread = null,这样方便GC, 因为线程栈也是占用内存空间的。

6. AQS框架Blocking Queue详解

任意时刻,无论并发多高,在单机jvm上面,同一时间,永远都只有一个线程可以进行入队和出队操作

6.1 阻塞队列特性

  • 线程安全
  • 有界队列和无界队列
  • 队列满 添加阻塞 队列空 读取阻塞

6.2 使用场景

  • 线程池的任务队列
  • 注册中心底层
  • 常用语生产者和消费者

7. Semaphore信号量

Semaphore 字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。

7.1 semaphoreDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i=0;i<10;i++){
new Thread(new Task(semaphore,"yangguo+"+i)).start();
}
}

static class Task extends Thread{
Semaphore semaphore;

public Task(Semaphore semaphore,String tname){
super(tname);
this.semaphore = semaphore;
//this.setName(tname);
}

public void run() {
try {
//semaphore.acquireUninterruptibly();
semaphore.acquire();//获取公共资源

System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
Thread.sleep(5000);

semaphore.release();

/*if(semaphore.tryAcquire(500,TimeUnit.MILLISECONDS)){
System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
Thread.sleep(5000);
semaphore.release();//释放公共资源
}else{
fallback();
}*/

} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void fallback(){
System.out.println("降级");
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
Thread-0:aquire() at time:1641623552311
Thread-1:aquire() at time:1641623552311
Thread-3:aquire() at time:1641623557312
Thread-2:aquire() at time:1641623557312
Thread-4:aquire() at time:1641623562317
Thread-5:aquire() at time:1641623562317
Thread-6:aquire() at time:1641623567322
Thread-7:aquire() at time:1641623567322
Thread-8:aquire() at time:1641623572325
Thread-9:aquire() at time:1641623572325

7.2 重要API

tryAcquire(long timeout, TimeUnit unit)

1
2
3
4
5
6
7
if(semaphore.tryAcquire(500,TimeUnit.MILLISECONDS)){
System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
Thread.sleep(5000);
semaphore.release();//释放公共资源
}else{
fallback();
}

方法源码:

1
2
3
4
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

semaphore.acquire();

之前reentrantLock blocking queue都是独占模式,而semaphore的lock则是共享的模式

1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

AQS模版方法,tryAcquireShared(arg) 在子类中有具体的实现

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
// 如果remaining >= 0 通过cas进行修改
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

这边和独占锁的区别在于多了一个 setHeadAndPropagate(node, r);方法。

8. CountDownLatch

9. CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CyclicBarrierRunner implements Runnable {
private CyclicBarrier cyclicBarrier;
private int index ;

public CyclicBarrierRunner(CyclicBarrier cyclicBarrier, int index) {
this.cyclicBarrier = cyclicBarrier;
this.index = index;
}

public void run() {
try {
System.out.println("index: " + index);
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
public void run() {
System.out.println("所有特工到达屏障,准备开始执行秘密任务");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");

Thread.sleep(5000);

for (int i = 0; i < 10; i++) {
new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");
}

}

9.1 CyclicBarrier 和 CountDownLatch的区别

  • 功能不一样
  • CyclicBarrier可复用,CountDownLatch不可复用
  • CyclicBarrier 和 CountDownLatch 倒过来用效果类似

10. Atomic类

原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为”不可被中断的一个或一系列操作” 。在多处理器上实现原子操作就变得有点复杂。本文让我们一起来聊一聊在Inter处理器和Java里是如何实现原子操作的。

Atomic 底层是基于无锁化的cas算法。基于魔术类Unsafe提供的三大cas-api完成

1
2
3
4
5
6
7
compareAndSwapObject

compareAndSwapInt

compareAndSwapLong

//基于硬件原语-CMPXCHG实现原子操作cas

基于硬件原语-CMPXCHG实现原子操作cas 在用户态就可以完成的操作,不会有切换的开销

1
2
3
4
5
6
7
8
9
do {

oldvalue = this.getIntVolatile(var1, var2);//读AtomicInteger的value值

///valueOffset---value属性在对象内存当中的偏移量

} while(!this.compareAndSwapInt(AtomicInteger, valueOffset, oldvalue, oldvalue + 1));

return var5;

10.1 什么叫偏移量?

要用cas修改某个对象属性的值->,首先要知道属性在对象的内存空间的哪个位置,必须知道属性的偏移量

10.2 如何通过原子操作修改一个对象的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class AtomicStudentAgeUpdater {
private String name ;
private volatile int age;

public AtomicStudentAgeUpdater(String name,int age){
this.name = name;
this.age = age;
}

public int getAge(){
return this.age;
}

public static void main(String[] args) {
AtomicStudentAgeUpdater updater = new AtomicStudentAgeUpdater("杨过",18);
System.out.println(ClassLayout.parseInstance(updater).toPrintable());
updater.compareAndSwapAge(18,56);
System.out.println("真实的杨过年龄---"+updater.getAge());
}

private static final Unsafe unsafe = UnsafeInstance.reflectGetUnsafe();
private static final long valueOffset;

static {
try {
valueOffset = unsafe.objectFieldOffset(AtomicStudentAgeUpdater.class.getDeclaredField("age"));
System.out.println("valueOffset:--->"+valueOffset);
} catch (Exception e) {
throw new Error(e);
}
}

public void compareAndSwapAge(int old,int target){
unsafe.compareAndSwapInt(this,valueOffset,old,target);
}

}

注意:

这里我们是通过 Unsafe 类去操作修改对象的属性。需要拿到这个属性的 偏移量

10.3 如果需要原子操作的是数组,怎么办?

1
2
3
4
5
6
7
8
9
10
11
12
public class AtomicIntegerArrayRunner {

static int[] value = new int[]{1, 2};
static AtomicIntegerArray aiArray = new AtomicIntegerArray(value);

public static void main(String[] args) {
//todo 原子修改数组下标0的数值
aiArray.getAndSet(0, 3);
System.out.println(aiArray.get(0));
System.out.println(value[0]);
}
}

执行结果:

1
2
3
1

底层原理

1
2
3
4
 public AtomicIntegerArray(int[] array) {
// Visibility guaranteed by final field guarantees
this.array = array.clone();
}

10.4 如果原子操作修改的是对象类型数组呢?

1
2
3
4
5
6
7
8
9
10
public class AtomicReferenceArrayRunner {
static Tuling[] ovalue = new Tuling[]{new Tuling(1),new Tuling(2)};
static AtomicReferenceArray<Tuling> objarray = new AtomicReferenceArray(ovalue);

public static void main(String[] args) {
System.out.println(objarray.get(0).getSequence());
objarray.set(0,new Tuling(3));
System.out.println(objarray.get(0).getSequence());
}
}

执行结果:

1
2
1
3

底层原理:

1
2
3
4
public AtomicReferenceArray(E[] array) {
// Visibility guaranteed by final field guarantees
this.array = Arrays.copyOf(array, array.length, Object[].class);
}

10.5 AtomicIntegerFieldUpdater修改对象的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AtomicIntegerFieldUpdateRunner {

static AtomicIntegerFieldUpdater aifu = AtomicIntegerFieldUpdater.newUpdater(Student.class,"old");

public static void main(String[] args) {
Student stu = new Student("杨过",18);
System.out.println(aifu.getAndIncrement(stu));
System.out.println(aifu.getAndIncrement(stu));
System.out.println(aifu.incrementAndGet(stu));
System.out.println(aifu.get(stu));
}

static class Student{
private String name;
public volatile int old;

public Student(String name ,int old){
this.name = name;
this.old = old;
}

public String getName() {
return name;
}

public int getOld() {
return old;
}
}
}

Int 类型属性的偏移量不需要我们程序员自己调用api计算,AtomicIntegerFieldUpdater这个底层会自己计算。

10.6 如果需要修改的属性不是integer类型的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class AtomicReferenceFieldUpdaterRunner {

static AtomicReferenceFieldUpdater atomic = AtomicReferenceFieldUpdater.newUpdater(Document.class, String.class, "name");

public static void main(String[] args) {
Document document = new Document("杨过", 1);
System.out.println(atomic.get(document));
atomic.getAndSet(document, "xiaolongnv");
System.out.println(atomic.get(document));

//另一种方式修改
UnaryOperator<String> uo = s -> {
System.out.println("UnaryOperator:-->" + s);
return "小龙女";
};
System.out.println(atomic.getAndUpdate(document, uo));
System.out.println(atomic.get(document));

}

@Data
static class Document {
public volatile String name;
private int version;

Document(String obj, int v) {
name = obj;
version = v;
}
}
}

注意点

  • Name 属性必须是 public
  • Name 属性必须是 volatile

10.7 大名鼎鼎的ABA问题

添加版本号解决ABA问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class AtomicStampedRerenceRunner {

private static AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(1, 0);

public static void main(String[] args){
Thread main = new Thread(() -> {
int stamp = atomicStampedRef.getStamp(); //获取当前标识别
System.out.println("操作线程" + Thread.currentThread()+ "stamp="+stamp + ",初始值 a = " + atomicStampedRef.getReference());
try {
Thread.sleep(3000); //等待1秒 ,以便让干扰线程执行
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean isCASSuccess = atomicStampedRef.compareAndSet(1,2,stamp,stamp +1); //此时expectedReference未发生改变,但是stamp已经被修改了,所以CAS失败
System.out.println("操作线程" + Thread.currentThread() + "stamp="+stamp + ",CAS操作结果: " + isCASSuccess);
},"主操作线程");

Thread other = new Thread(() -> {
int stamp = atomicStampedRef.getStamp();
atomicStampedRef.compareAndSet(1,2,stamp,stamp+1);
System.out.println("操作线程" + Thread.currentThread() + "stamp="+atomicStampedRef.getStamp() +",【increment】 ,值 a= "+ atomicStampedRef.getReference());
stamp = atomicStampedRef.getStamp();
atomicStampedRef.compareAndSet(2,1,stamp,stamp+1);
System.out.println("操作线程" + Thread.currentThread() + "stamp="+atomicStampedRef.getStamp() +",【decrement】 ,值 a= "+ atomicStampedRef.getReference());
},"干扰线程");

main.start();
LockSupport.parkNanos(1000000);
other.start();
}
}

11. Unsafe类

Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全操作的方法,如直接访问系统内存资源自主管理内存资源等,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。

但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。

Unsafe类为一单例实现,提供静态方法getUnsafe获取Unsafe实例,当且仅当调用getUnsafe方法的类为引导类加载器所加载时才合法,否则抛出SecurityException异常。

11.1 Unsafe.class

1
2
3
4
5
6
7
8
9
10
11
12
private Unsafe() {
}

@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}

11.2 unsafe 堆外内存优化文件上传

Java 频繁操作文件,可能造成内存使用骤增,jvm频繁GC。 如何优化?

  • 可以使用堆外内存 这样可以不会影响正常业务的请求,jvm可以正常执行其他业务
  • 堆外内存不属于jvm管,使用完之后,需要手动释放,否则容易造成内存泄漏。

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AllocateMemoryAccess {

public static void main(String[] args) {
Unsafe unsafe = UnsafeInstance.reflectGetUnsafe();
long oneHundred = 1193123491341341234L;
byte size = 8;
/*
* 调用allocateMemory分配内存
*/
long memoryAddress = unsafe.allocateMemory(size);
System.out.println("address:->"+memoryAddress);
/*
* 将1写入到内存中
*/
unsafe.putAddress(memoryAddress, oneHundred);
/*
* 内存中读取数据
*/
long readValue = unsafe.getAddress(memoryAddress);

System.out.println("value : " + readValue);

unsafe.freeMemory(memoryAddress);
}

@Override
protected void finalize() throws Throwable {
super.finalize();
}
}

11.3 monitor对象锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ObjectMonitorRunner {
static Object object = new Object();
static Unsafe unsafe = UnsafeInstance.reflectGetUnsafe();
public void method1(){
unsafe.monitorEnter(object);
}

public void method2(){
unsafe.monitorExit(object);
}

public static void main(String[] args) {
//jvm内置锁
synchronized (object){
//写逻辑
}

ObjectMonitorRunner objectMonitorRunner = new ObjectMonitorRunner();
objectMonitorRunner.method1();
objectMonitorRunner.method2();
}
}
  • 可以跨方法加锁和释放锁。
  • 注意死锁问题

11.4 线程阻塞和唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadParkerRunner {

static Unsafe unsafe = UnsafeInstance.reflectGetUnsafe();

public static void main(String[] args) {

Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread - is running----");
//true则会实现ms定时,false则会实现ns定时。
unsafe.park(false,0L); //阻塞当前线程
System.out.println("thread is over-----");
}
});

t.start();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("唤醒Thread-t");
unsafe.unpark(t);
}
}

AQS中大量使用了 public native void unpark(Object var1); 方法;

12. 并发容器之ConcurrentHashMap

12.1 重要成员属性

  • DEFAULT_INITIAL_CAPACITY = 1 << 4; Hash表默认初始容量
  • MAXIMUM_CAPACITY = 1 << 30; 最大Hash表容量
  • DEFAULT_LOAD_FACTOR = 0.75f;默认加载因子
  • TREEIFY_THRESHOLD = 8;链表转红黑树阈值
  • UNTREEIFY_THRESHOLD = 6;红黑树转链表阈值
  • MIN_TREEIFY_CAPACITY = 64;链表转红黑树时hash表最小容量阈值,达不到优先扩容。

12.2 hashMap底层结构

  • 1.7 数组 + 链表
  • 1.8之后 数组 + 链表/红黑树

12.3 hashmap容量

hashmap的容量,值的是数组的长度。如果不考虑hash 碰撞的情况下,hash map 存放数据量 = 数组的长度 = 容量

12.3.1 容量是如何初始化的?

1
new HashMap<>(11); // 初始容量就是11 ?
  • 必须大于设置的size
  • 必须是2的指数幂
  • 必须大于size

所以,如果我们初始化容量设置成11,实际初始化容量为16. 如果size = 17呢? 实际初始化容量则是32;

12.3.2 hashmap的默认容量为什么是16?

  • 16的容量基本够用了
  • 容量必须是2的指数倍

12.3.3 hashCode 可以小于0吗?

可以的

12.3.4 容量为什么一定是2的指数次幂?

在使用hashCode 计算数组下标进行 「位运算」时,tab[i = (n - 1) & hash])

12.3.5 key是如何保证唯一的?

重复的覆盖

12.4 什么是hash碰撞,如何解决hash碰撞?

hash碰撞,hash冲突。不同的值经过hashcode定位数组到同一个下标下。

  • 拉链法 (1.7 头部插入 1.8 尾部插入法)
  • 开放寻址法

12.5 元素下表如何计算的?

位运算 效率比取模运算效率要高

1
tab[i = (n - 1) & hash]

12.5.1 为什么用 length - 1去做与运算?

  • 防止发生越界
  • 低位全部是1,计算快

12.6 计算hashCode的时候,下面的一些抑或操作是做什么的?

1
2
3
4
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

为了让hash变得更加散列,降低hash碰撞。

 什么是hash扰动 ? ? ?

12.7 hashmap是如何扩容的?

12.7.1 什么时候触发扩容

size > threshold = capital * load(加载因子),触发扩容。

13 >= 12 = 16 * 0.75

12.7.2 扩容是怎么进行的

  • 数组的长度必须还得是2的指数幂,所以数组长度 * 2
  • 将原来的数据迁移到新的数组上面

12.7.3 hashmap1.7扩容机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;//第一行
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);//第二行
e.next = newTable[i];//第三行
newTable[i] = e;//第四行
e = next;//第五行
}
}
}

去掉了一些冗余的代码, 层次结构更加清晰了。

  • 第一行:记录oldhash表中e.next
  • 第二行:rehash计算出数组的位置(hash表中桶的位置)
  • 第三行:e要插入链表的头部, 所以要先将e.next指向new hash表中的第一个元素
  • 第四行:将e放入到new hash表的头部
  • 第五行: 转移e到下一个节点, 继续循环下去

头插法: 原来链表上的数据是 A - B - C ,新数组上的结点是 C - B - A

单线程扩容时没有任何问题,但是多线程进行扩容的情况下,会发生死循环问题。

12.7.3.1 多线程扩容的死锁问题
1
2
3
4
5
6
7
while(null != e) {
Entry<K,V> next = e.next;//第一行,线程1执行到此被调度挂起
int i = indexFor(e.hash, newCapacity);//第二行
e.next = newTable[i];//第三行
newTable[i] = e;//第四行
e = next;//第五行
}
12.7.3.2 多线程扩容流程

流程图如下:

后面会形成环

下次添加数据或者查询数据的时候,可能会发生死循环;

12.7.4 hashmap1.8扩容机制

12.7.4.1 什么时候链表可以转红黑树
  • 阈值=8 当链表长度为9的时候,链表可以转换成红黑树,但是还有下面这个条件的限制
  • 容量大于等于 64,否则优先扩容
12.7.4.2 为什么长度是8 链表转红黑树?

【泊松分布】

一句话总结:泊松分布是单位时间内独立事件发生次数的概率分布,指数分布是独立事件的时间间隔的概率分布。

请注意是”独立事件”,泊松分布和指数分布的前提是,事件之间不能有关联,否则就不能运用上面的公式。

大多数情况下不会超过8的长度

12.7.4.3 扩容流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}

避免了rehash

12.8 加载因子为什么是0.75?

  • 如果调整的比较小,空间利用率会下降
  • 如果调整的比较大,哈希碰撞可能比较严重

12.9 线程安全的ConcurrentHashMap

12.9.1 常用API

  • Put hash碰撞时同步
  • get 没有添加同步锁

12.9.2 JDK1.7中的concurrentHash

12.9.2.1 在JDK1.7中使用分段锁segment 继承 reentrantLock
  • 根据key寻找segment并尝试获取锁
  • 找到entry数组上的下标,然后插入数据
  • entry table 开始容量是2
12.9.2.2 扩容机制
1
2
3

// todo

12.9.2.3 默认并发度

16扩容是否可以增加并发度?

12.9.3 在JDK1.8中使用cas + synchronized

12.9.3.1 重要属性

ConcurrentHashMap拥有出色的性能, 在真正掌握内部结构时, 先要掌握比较重要的成员:

  • LOAD_FACTOR: 负载因子, 默认75%, 当table使用率达到75%时, 为减少table的hash碰撞, tabel长度将扩容一倍。负载因子计算: 元素总个数%table.lengh

  • TREEIFY_THRESHOLD: 默认8, 当链表长度达到8时, 将结构转变为红黑树。

  • UNTREEIFY_THRESHOLD: 默认6, 红黑树转变为链表的阈值。

  • MIN_TRANSFER_STRIDE: 默认16, table扩容时, 每个线程最少迁移table的槽位个数。

  • MOVED: 值为-1, 当Node.hash为MOVED时, 代表着table正在扩容

  • TREEBIN, 置为-2, 代表此元素后接红黑树。

  • nextTable: table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上。

  • sizeCtl: 用来标志table初始化和扩容的,不同的取值代表着不同的含义:

      • 0: table还没有被初始化
      • -1: table正在初始化
      • 小于-1: 实际值为resizeStamp(n)<
      • 大于0: 初始化完成后, 代表table最大存放元素的个数, 默认为0.75*n
  • transferIndex: table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标

  • ForwardingNode: 一个特殊的Node节点, 其hashcode=MOVED, 代表着此时table正在做扩容操作。扩容期间, 若table某个元素为null, 那么该元素设置为ForwardingNode, 当下个线程向这个元素插入数据时, 检查hashcode=MOVED, 就会帮着扩容。

​ ConcurrentHashMap由三部分构成, table+链表+红黑树, 其中table是一个数组, 既然是数组, 必须要在使用时确定数组的大小, 当table存放的元素过多时, 就需要扩容, 以减少碰撞发生次数, 本文就讲解扩容的过程。

12.9.3.2 扩容的触发条件

扩容检查主要发生在插入元素(putVal())的过程:

  • 一个线程插完元素后, 检查table使用率, 若超过阈值, 调用transfer进行扩容
  • 一个线程插入数据时, 发现table对应元素的hash=MOVED, 那么调用helpTransfer()协助扩容。
12.9.3.3 put方法详解

源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
1、初始化hash table
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tab = initTable();

private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 为什么这里初始化需要while循环
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

初始化的时候可能是多个线程同时在初始化,为了保证线程安全,这里使用了while + cas 来保证多线程安全。

U.compareAndSwapInt(this, SIZECTL, sc, -1) 相当于获取线程执行权限。只会有一个线程可以初始化,没有拿到执行权限的时候下一次循环的时候会判断

(tab = table) == null || tab.length == 0. 不满足条件自然就不再初始化了。

2、判断数组下标的位置是否是空的值
1
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) 

如果数组下标的位置是null,说明还没有元素,则使用cas来存储数据。

1
2
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
3、为什么需要cas呢?
  • 保证线程安全
4、为什么put方法第5行需要循环?
1
for (Node<K,V>[] tab = table;;) 

第一个线程put一个数据和第二个线程put的数据如果发生了冲突,则第一次循环只有一个线程可以设置成功,另一个线程只能等到下一次循环。

5、如果在不考虑扩容的情况下发生hash冲突,第二次插入的时候会怎样?
  • 对数组上的元素也就是链表的头结点的对象加锁 synchronized
  • 构建链表
  • 尾部插入新元素
  • 如果已经存在红黑树,则继续添加树的结点
6、往链表上添加数据如果不使用synchronized会怎样?
1
2
3
4
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,value, null);
break;
}

会出现添加的值被覆盖掉的情况:

  • 线程一执行 pred.next = new Node<K,V> [1,a]
  • 线程二执行 pred.next = new Node<K,V> [2,b]

最后[1,a]丢失了。

7、如何判断正在扩容
1
2
3
4
5
6
7
8
9
10
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);

f 指的是链表的头结点。

8、如果put的时候正在扩容会怎样
  • 当前线程会帮忙参与扩容。helpTransfer(tab, f)

  • 最少分配16个槽位迁移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { //table扩容
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据 length 得到一个标识符号
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {//说明还在扩容
//判断是否标志发生了变化|| 扩容结束了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
//达到最大的帮助线程 || 判断扩容转移下标是否在调整(扩容结束)
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

主要做了如下事情:

  • 检查是否扩容完成

  • 对sizeCtrl = sizeCtrl+1, 然后调用transfer()进行真正的扩容。

9、扩容transfer

扩容的整体步骤就是新建一个nextTab, size是之前的2倍, 将table上的非空元素迁移到nextTab上面去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// subdivide range,每个线程最少迁移16个槽位,大的话,最多
stride = MIN_TRANSFER_STRIDE;
// initiating 才开始初始化新的nextTab
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //扩容2倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//更新的转移下标,
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//是否能够向前推进到下一个周期
boolean advance = true;
// to ensure sweep before committing nextTab,完成状态,如果是,则结束此方法
boolean finishing = false;
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) { //取下一个周期
int nextIndex, nextBound;
//本线程处理的区间范围为[bound, i),范围还没有处理完成,那么就继续处理
if (--i >= bound || finishing)
advance = false;
//目前处理到了这里(从大到小, 下线),开始找新的一轮的区间
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//这个条件改变的是transferIndex的值,从16变成了1
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
//nextBound 是这次迁移任务的边界,注意,是从后往前
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound; //一块区间最小桶的下标
i = nextIndex - 1; //能够处理的最大桶的下标
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) { //每个迁移线程都能达到这里
int sc;
if (finishing) { //迁移完成
nextTable = null;
//直接把以前的table丢弃了,上面的MOVE等标志全部丢弃,使用新的
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); //扩大2n-0.5n = 1.50n, 更新新的容量阈值
return;
}
//表示当前线程迁移完成了
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//注意此时sc的值并不等于sizeCtl,上一步,sizeCtl=sizeCtl-1了。这两个对象还是分割的
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果对应位置为null, 则将ForwardingNode放在对应的地方
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) //别的线程已经在处理了,再推进一个下标
advance = true; // already processed,推动到下一个周期,仍然会检查i与bound是否结束
else { //说明位置上有值了,
//需要加锁,防止再向里面放值,在放数据时,也会锁住。比如整个table正在迁移,还没有迁移到这个元素,另外一个线程向这个节点插入数据,此时迁移到这里了,会被阻塞住
synchronized (f) {
if (tabAt(tab, i) == f) {//判断i下标和f是否相同
Node<K,V> ln, hn; //高位桶, 地位桶
if (fh >= 0) {
int runBit = fh & n;//n为2^n, 取余后只能是2^n
Node<K,V> lastRun = f;
///找到最后一个不和fn相同的节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
//只要找到这,之后的取值都是一样的,下次循环时,就不用再循环后面的
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else { //比如1,16,32,如果低位%16,那么肯定是0。
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
//这样就把相同串的给串起来了
ln = new Node<K,V>(ph, pk, pv, ln);
else
//这样就把相同串的给串起来了,注意这里ln用法,第一个next为null,烦着串起来了。
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); //反着给串起来了
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {// 如果是红黑树
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null; //也是高低节点
TreeNode<K,V> hi = null, hiTail = null;//也是高低节点
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) { //中序遍历红黑树
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) { //0的放低位
//注意这里p.prev = loTail,每一个p都是下一个的prev
if ((p.prev = loTail) == null)
lo = p; //把头记住
else
loTail.next = p; //上一次的p的next是这次的p
loTail = p; //把上次p给记住
++lc;
}
else { //高位
if ((p.prev = hiTail) == null)
hi = p; //把尾记住
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :// //判断是否需要转化为树
(hc != 0) ? new TreeBin<K,V>(lo) : t; //如果没有高低的话,则部分为两个树
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

其中有两个变量需要了解下:

  • advance: 表示是否可以向下一个轮元素进行迁移。
  • finishing: table所有元素是否迁移完成。

大致做了如下事情:

  • 确定线程每轮迁移元素的个数stride, 比如进来一个线程, 确定扩容table下标为(a,b]之间元素, 下一个线程扩容(b,c]。这里对b-a或者c-b也是由最小值16限制的。 也就是说每个线程最少扩容连续16个table的元素。而标志当前迁移的下标保存在transferIndex里面。
  • 检查nextTab是否完成初始化, 若没有的话, 说明是第一个迁移的线程, 先初始化nextTab, size是之前table的2倍。
  • 进入while循环查找本轮迁移的table下标元素区间, 保存在(bound, i]中, 注意这里是半开半闭区间。
  • 从i -> bound开始遍历table中每个元素, 这里是从大到小遍历的:
  1. 若该元素为空, 则向该元素标写入ForwardingNode, 然后检查下一个元素。 当别的线程向这个元素插入数据时, 根据这个标志符知道了table正在被别的线程迁移, 在putVal中就会调用helpTransfer帮着迁移。
  2. 若该元素的hash=MOVED, 代表次table正在处于迁移之中, 跳过。 按道理不会跑着这里的。
  3. 否则说明该元素跟着的是一个链表或者是个红黑树结构, 若hash>0, 则说明是个链表, 若f instanceof TreeBin, 则说明是个红黑树结构。
  • 链表迁移原理如下: 遍历链表每个节点。 若节点的f.hash&n==0成立, 则将节点放在i, 否则, 则将节点放在n+i上面。

​ 迁移前, 对该元素进行加锁。 遍历链表时, 这里使用lastRun变量, 保留的是上次hash的值, 假如整个链表全部节点f.hash&n==0, 那么第二次遍历, 只要找到lastRun的值, 那么认为之后的节点都是相同值, 减少了不必要的f.hash&n取值。遍历完所有的节点后, 此时形成了两条链表, ln存放的是f.hash&n=0的节点, hn存放的是非0的节点, 然后将ln存放在nextTable第i元素的位置, n+i存放在n+i的位置。

蓝色节点代表:f.hash&n==0, 绿色节点代表f.hash&n!=0。 最终蓝色的节点仍在存放在(0, n)范围里, 绿的节点存放在(n, 2n-1)的范围之内。

  • 迁移链表和红黑树的原理是一样的, 在红黑树中, 我们记录了每个红黑树的first(这个节点不是hash最小的节点)和每个节点的next, 根据这两个元素, 我们可以访问红黑树所有的元素, 红黑树此时也是一个链表, 红黑树和链表迁移的过程一样。红黑树根据迁移后拆分成了hn和ln, 根据链表长度确定链表是红黑树结构还是退化为了链表。
10、如何确定table所有元素迁移完成
1
2
3
4
5
6
7
8
//表示当前线程迁移完成了
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//注意此时sc的值并不等于sizeCtl,上一步,sizeCtl=sizeCtl-1了。这两个对象还是分割的
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}

第一个线程开始迁移时, 设置了sizeCtl= resizeStamp(n) << RESIZE_STAMP_SHIFT+2, 此后每个新来帮助迁移的线程都会sizeCtl=sizeCtl+1, 完成迁移后,sizeCtl-1, 那么只要有一个线程还处于迁移状态, 那么sizeCtl> resizeStamp(n) << RESIZE_STAMP_SHIFT+2一直成立, 当只有最后一个线程完成迁移之后, 等式两边才成立。 可能大家会有疑问, 第一个线程并没有对sizeCtl=sizeCtl+1, 此时完成后再减一, 那不是不相等了吗, 注意这里, sizeCtl在减一前, 将值赋给了sc, 等式比较的是sc。

总结

table扩容过程就是将table元素迁移到新的table上, 在元素迁移时, 可以并发完成, 加快了迁移速度, 同时不至于阻塞线程。所有元素迁移完成后, 旧的table直接丢失, 直接使用新的table。

13. 并发容器之CopyOnWriteArrayList

13.1 什么是fail-fast机制

1
2
3
4
5
6
7
Exception in thread "pool-1-thread-11" java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
at java.util.ArrayList$Itr.next(ArrayList.java:859)
at com.yg.edu.list.CopyOnWriteArrayListRunner$ReadTask.run(CopyOnWriteArrayListRunner.java:30)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

13.2 CopyOnWriteArrayList设计思路

猜想 ReentrantReadWriteLock 是否可以实现这个功能呢?

  • 读操作加读锁
  • 写操作加写锁

适用于读写都比价多的场景。

但是如果读多写少的场景下呢?有没有优化方案?

核心思想:读写分离,空间换时间,避免为保证并发安全导致的激烈的锁竞争。

划关键点:

1、CopyOnWrite适用于读多写少的情况,最大程度的提高读的效率;

2、CopyOnWrite是最终一致性,在写的过程中,原有的读的数据是不会发生更新的,只有新的读才能读到最新数据;

3、如何使其他线程能够及时读到新的数据,需要使用volatile变量;

4、写的时候不能并发写,需要对写操作进行加锁;

13.3 add方法源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

final ReentrantLock lock = this.lock; 保证任意时刻只有一个线程进行add。

13.4 使用场景

  • 适合读多写少的情况。 思路就是空间换时间。
  • 会造成一致性问题,只能保证最终一致性,可能会读到脏数据。

13.5 CopyOnWriteArraySet 底层

底层基于CopyOnWriteArrayList实现的

13.6 ConcurrentSkipListMap

跳表:

  • 保证key的顺序
  • 底层数据结构基于链表
  • 时间复杂度O(logn)
  • 空间换时间

14. Executor线程池原理解读

14.1 什么是线程?

线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型(这个在第一部分已经介绍过),JVM使用的KLT模型Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有一个对应的线程。

进程是系统资源分配的基本单位。

14.2 java线程中的状态

Java线程有多种生命状态

1
2
3
4
5
6
NEW,新建
RUNNABLE,运行
BLOCKED,阻塞
WAITING,等待
TIMED_WAITING,超时等待
TERMINATED,终结

14.3 为什么需要线程池?

创建线程和销毁线程需要CPU进行用户态和内核态的切换,因为上面提到过,java的线程模型是JLT模型。每创建一个java线程,在底层操作系统也需要创建一个系统级别的线程,有操作系统管理。

“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控

线程池介绍

在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:

如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。

那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

14.4 什么时候使用线程池?

  • 单个任务处理时间比较短
  • 需要处理的任务数量很大

14.5 线程池优势

  • 重用存在的线程,减少线程创建,消亡的开销,提高性能
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

14.6 线程的生命周期

等待状态 超时等待状态 阻塞状态都会涉及到上下文切换!

14.7 什么是协程?

协程 (纤程,用户级线程),目的是为了追求最大力度的发挥硬件性能和提升软件的速度,协程基本原理是:在某个点挂起当前的任务,并且保存栈信息,去执行另一个任务;等完成或达到某个条件时,再还原原来的栈信息并继续执行(整个过程线程不需要上下文切换)。

Java原生不支持协程,在纯java代码里需要使用协程的话需要引入第三方包,如:quasar

  • 只能在CPU的一个核上工作
  • 把线程分成若干个 “单元”,也就是协程,协程之间切换不需要进行上下文切换
  • CPU大部分时间处于空闲,使用协程用来提高CPU利用率

14.8 Excutor框架

14.8.1 Excutor框架示意图

我们经常用到的是ThreadPoolExecutor

14.8.2 重要的API

从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为

1,execute(Runnable command):履行Ruannable类型的任务,

2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象

3,shutdown():在完成已提交的任务后封闭办事,不再接管新任务,

4,shutdownNow():停止所有正在履行的任务并封闭办事。

5,isTerminated():测试是否所有任务都履行完毕了。

6,isShutdown():测试是否该ExecutorService已被关闭。

14.8.3. 重点属性

1
2
3
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息:

  • 线程池的运行状态 (runState)
  • 线程池内有效线程的数量 (workerCount)

这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

ctl相关方法

1
2
3
4
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } // 获取活动线程数
private static int ctlOf(int rs, int wc) { return rs | wc; } // 获取运行状态和活动线程数的值

14.8.4为什么阿里java规约中不建议使用Executors

  • 因为队列使用的是无界队列。任务可以无限存放,可能会造成内存泄漏。

  • 而且任务阻塞时间比较久。

14.9 创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

1. corePoolSize

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

2. maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;

3. keepAliveTime

线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;

4. unit

keepAliveTime的单位;

5. workQueue

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

  • 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
  • 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
  • 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
  • 4、priorityBlockingQuene:具有优先级的无界阻塞队列;

6. threadFactory

它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。

7. handler

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

  • 1、AbortPolicy:直接抛出异常,默认策略;
  • 2、CallerRunsPolicy:用调用者所在的线程来执行任务;
  • 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • 4、DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

14.10 线程池执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
  • 首先创建核心线程并执行任务
  • 把任务添加到阻塞队列
  • 还有任务,创建非核心线程执行
  • 如果还有任务进来,执行对应的拒绝策略

思考🤔

问题一

如果核心线程还没有创建完,此时已经创建的核心线程的任务已经处理完了,此时,线程池新提交的任务由那个线程来处理呢?

是由已经创建的核心线程呢还是会创建新的核心线程呢?

答案是: 会创建新的核心线程,知道核心线程数量达到最大核心线程数。

问题二

如何区分核心线程还是非核心线程?

其实并没有办法区分,线程销毁的时候会去判断当前线程数量是否比核心线程数大,如果大,则线程销毁,那这个线程就是非核心线程。如果不大,而且也允许核心线程存活的话,线程就不会销毁,留下来的也就是核心线程。

14.11 线程池的生命状态?

1
2
3
4
5
RUNNING    = -1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011

1、RUNNING

(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。

(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!

2、 SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。

(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3、STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4、TIDYING

(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5、 TERMINATED

(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。

(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

进入TERMINATED的条件如下:

  • 线程池不是RUNNING状态;
  • 线程池状态不是TIDYING状态或TERMINATED状态;
  • 如果线程池状态是SHUTDOWN并且workerQueue为空;
  • workerCount为0;
  • 设置TIDYING状态成功。

14.12 线程池的拒绝策略有几种?

1. AbortPolicy 抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

2. CallerRunsPolicy:由当前线程执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }

/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

3. DiscardOldestPolicy:抛弃队列中最老的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }

/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

4. DiscardPolicy: 直接丢弃,什么都不做

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }

/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

14.13 非核心线程过期是如何判断的?

阻塞队列实现,poll设置时间,然后响应中断异常。

14.14 线程池参数如何配置?

  • CPU密集型 : CPU核数 + 1
  • IO密集型 : 2倍CPU核数

15. ScheduledThreadPoolExecutor

15.1 创建定时线程池

1
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);

15.2 如何周期性执行定时任务?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
log.info("send heart beat");
long starttime = System.currentTimeMillis(), nowtime = starttime;
while ((nowtime - starttime) < 5000) {
nowtime = System.currentTimeMillis();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("task over....");
throw new RuntimeException("unexpected error , stop working");
}, 1000, 2000, TimeUnit.MILLISECONDS);

15.3 定时任务的执行时间如果大于执行时间间隔怎么办?

1
scheduleWithFixedDelay

上面第一种定时任务执行的方式下,在任务开始的时候开始算起,任务执行5s,但是任务执行间隔时间是2s,意味着任务还没有执行结束,下一次任务就在等待执行过了。最终会演变成,定时任务执行时间间隔变成5s。

解决上面的问题,就不应该在任务执行开始的时候就记时算起,而是在任务执行完成之后,2s后再开始执行新一轮任务。

如果只有一个任务,但是核心线程数配了2个会怎样?

只会有一个线程执行任务,因为只有一个任务,所以只创建了一个核心线程。

线程创建是在提交任务的时候,所以,如果只是提交了1个任务,只会创建一个线程。

15.4 如果在执行过程中抛出异常会怎样?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
log.info("send heart beat");
long starttime = System.currentTimeMillis(), nowtime = starttime;
while ((nowtime - starttime) < 5000) {
nowtime = System.currentTimeMillis();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("task over....");
throw new RuntimeException("unexpected error , stop working");
}, 1000, 2000, TimeUnit.MILLISECONDS);

线程创建了但是任务不执行

15.5 还有什么方式可以定时执行任务?

1
2
3
4
5
6
7
8
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
log.info("send heart beat");
throw new RuntimeException("unexpected error , stop working");
}
},1000,2000);

执行结果:

1
2
3
4
5
6
7
8
9
10
17:21:11.915 [Timer-0] INFO com.yg.edu.schedule.ScheduleThreadPoolRunner - send heart beat
Exception in thread "Timer-0" java.lang.RuntimeException: unexpected error , stop working
at com.yg.edu.schedule.ScheduleThreadPoolRunner$1.run(ScheduleThreadPoolRunner.java:95)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)

Exception in thread "main" java.lang.IllegalStateException: Timer already cancelled.
at java.util.Timer.sched(Timer.java:397)
at java.util.Timer.scheduleAtFixedRate(Timer.java:328)
at com.yg.edu.schedule.ScheduleThreadPoolRunner.main(ScheduleThreadPoolRunner.java:105)

15.6 Timer和Scheduled两种方式有什么区别?

Timer是单线程的,如果提交了多个任务,一旦一个任务抛出了异常,其他的任务也无法在执行

scheduledThreadPoolExecutor是多线程的,一个任务抛出了异常不影响其他的任务和将要来临的任务

15.7 使用场景

1、分布式锁 锁续命 定时判断 看门狗

2、心跳检测

3、其他

15.8 定时任务是如何排序的?

堆结构

16. Fock/Join框架

16.1 任务性质类型

CPU密集型(CPU-bound)

CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。

在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于CPU bound的程序。

CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。

线程数一般设置为:线程数 = CPU核数+1 (现代CPU支持超线程)

IO密集型(I/O bound)

IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。

I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。

线程数一般设置为:线程数 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

CPU密集型 vs IO密集型

我们可以把任务分为计算密集型和IO密集型。

计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

16.2 什么是 Fork/Join 框架?

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…..+10000,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和,最终汇总这 10 个子任务的结果。如下图所示:

16.3 Fork/Jion特性

  1. ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。(见 Java Tip: When to use ForkJoinPool vs ExecutorService )
  2. ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
  3. ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。

16.4 Fork/Join的实现

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,**被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。**

16.4.1 工作窃取算法的优点

是充分利用线程进行并行计算,并减少了线程间的竞争,

16.4.2 工作窃取算法的缺点

  • 在某些情况下还是存在竞争,比如双端队列里只有一个任务时。

  • 消耗了更多的系统资源,比如创建多个线程和多个双端队列。

16.4.3 实现原理

  1. ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
  2. 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
  3. 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
  4. 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
  5. 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

16.5 Fork/Join框架执行流程

ForkJoinPool 中的任务执行分两种:

  • 直接通过 FJP 提交的外部任务(external/submissions task),存放在 workQueues 的偶数槽位;
  • 通过内部 fork 分割的子任务(Worker task),存放在 workQueues 的奇数槽位。

参考文档