JAVA-Concurrency

引言:

Java并发学习笔记

进程

进程是系统进行资源分配的基本单位。每一个进程都有它自己的内存空间和系统资源

进程的不足在于多处理机环境下的进程调度,分派,切换时,都需要花费较大的时间和空间开销

引入线程主要是为了提高系统的执行效率,减少处理机的空转时间和调度切换的时间,以及便于系统管理。使OS具有更好的并发性

  • 进程实现多处理非常耗费CPU的资源,而引入线程是作为调度的基本单位(取代进程的部分基本功能【调度】)。
  • 线程共享进程的方法区资源,但每个线程有自己的程序计数器虚拟机栈本地方法栈,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。

并行与并发

并行:

  • 并行性是指同一时刻内(一个时间段)发生两个或多个事件。
  • 并行是在不同实体上的多个事件

并发:

  • 并发性是指同一时间间隔内发生两个或多个事件。
  • 并发是在同一实体上的多个事件

由此可见:并行是针对进程的,并发是针对线程的

进程与线程

  • 进程作为资源分配的基本单位
  • 线程作为资源调度的基本单位,是程序的执行单元,执行路径(单线程:一条执行路径,多线程:多条执行路径)。是程序使用CPU的最基本单位。
  • 一个 Java 程序的运行是 main 线程和多个其他线程同时运行

线程

线程状态

image-20200421094323779

新建(New)

创建后尚未启动。

可运行(Runnable)

可能正在运行,也可能正在等待 CPU 时间片。

包含了操作系统线程状态中的 运行(Running ) 和 就绪(Ready)。

阻塞(Blocking)

这个状态下,是在多个线程有同步操作的场景,比如正在等待另一个线程的 synchronized 块的执行释放,或者可重入的 synchronized 块里别人调用 wait() 方法,也就是线程在等待进入临界区。

阻塞可以分为:等待阻塞,同步阻塞,其他阻塞

无限期等待(Waiting)

等待其它线程显式地唤醒,否则不会被分配 CPU 时间片。

进入方法 退出方法
没有设置 Timeout 参数的 Object.wait() 方法 Object.notify() / Object.notifyAll()
没有设置 Timeout 参数的 Thread.join() 方法 被调用的线程执行完毕
LockSupport.park() 方法 -

限期等待(Timed Waiting)

无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。

调用 Thread.sleep() 方法使线程进入限期等待状态时,常常用 “使一个线程睡眠” 进行描述。

调用 Object.wait() 方法使线程进入限期等待或者无限期等待时,常常用 “挂起一个线程” 进行描述。

睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态

阻塞和等待的区别在于,阻塞是被动的,它是在等待获取一个排它锁。而等待是主动的,通过调用 Thread.sleep() 和 Object.wait() 等方法进入。

进入方法 退出方法
Thread.sleep() 方法 时间结束
设置了 Timeout 参数的 Object.wait() 方法 时间结束 / Object.notify() / Object.notifyAll()
设置了 Timeout 参数的 Thread.join() 方法 时间结束 / 被调用的线程执行完毕
LockSupport.parkNanos() 方法 -
LockSupport.parkUntil() 方法 -

死亡(Terminated)

  • 线程因为 run 方法正常退出而自然死亡
  • 因为一个没有捕获的异常终止了 run 方法而意外死亡

image-20200421110500064

image-20200421102640506

多线程实现方式

有三种使用线程的方法:

  • 实现 Runnable 接口;
  • 实现 Callable 接口;
  • 继承 Thread 类。

实现 Runnable 和 Callable 接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调用。可以说任务是通过线程驱动从而执行的。

实现 Runnable 接口

需要实现 run() 方法。

通过 Thread 调用 start() 方法来启动线程。

1
2
3
4
5
6
7
8
9
10
11
public class MyRunnable implements Runnable {
public void run() {
// ...
}
}

public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
Thread thread = new Thread(instance);
thread.start();
}

实现 Callable 接口

与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}

继承 Thread 类

同样也是需要实现 run() 方法,因为 Thread 类也实现了 Runable 接口。

1
2
3
4
5
6
7
8
9
10
public class MyThread extends Thread {
public void run() {
// ...
}
}

public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}

实现接口 VS 继承 Thread

实现接口会更好一些,因为:

  • Java 不支持多重继承,因此继承了 Thread 类就无法继承其它类,但是可以实现多个接口;
  • 类可能只要求可执行就行,继承整个 Thread 类开销过大。

三种方式的区别

  • 实现 Runnable 接口可以避免 Java 单继承特性而带来的局限;增强程序的健壮性,代码能够被多个线程共享,代码与数据是独立的;适合多个相同程序代码的线程区处理同一资源的情况。
  • 继承 Thread 类和实现 Runnable 方法启动线程都是使用 start() 方法,然后 JVM 虚拟机将此线程放到就绪队列中,如果有处理机可用,则执行 run() 方法。
  • 实现 Callable 接口要实现 call() 方法,并且线程执行完毕后会有返回值。其他的两种都是重写 run() 方法,没有返回值。

线程方法介绍

getName

1
2
3
4
5
6
7
8
9
10
11
/**
* Returns this thread's name.
*
* @return this thread's name.
* @see #setName(String)
*/
public final String getName() {
return name;
}

// myThread.getName();

isAlive

isAlive活着的定义是就绪、运行、阻塞状态

1
2
3
4
5
6
7
/**
* @return <code>true</code> if this thread is alive;
* <code>false</code> otherwise.
*/
public final native boolean isAlive();

// myThread.getName();

sleep(当前线程.sleep)

sleep时持有的锁不会自动释放,sleep时可能会抛出InterruptedException。

Thread.sleep(long millis) 一定是当前线程调用此方法,当前线程进入TIME_WAIT状态,但不释放对象锁,millis后线程自动苏醒进入READY状态。

  • 作用:给其它线程执行机会的最佳方式。
1
2
3
4
5
6
7
8
9
/**
* @param millis
* the length of time to sleep in milliseconds
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
* @throws InterruptedException
* if any thread has interrupted the current thread.
*/
public static native void sleep(long millis) throws InterruptedException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TestThread extends Thread{
public void run() {
// ...
try {
sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException {
TestThread mt = new TestThread();
sleep(100);
}
}

join(其他线程.join)

t.join()/t.join(long millis) 当前myThread线程里调用otherThread的方法,当前线程进入WAIT状态,但不释放对象锁,直到otherThread线程执行完毕或者millis时间到,当前线程进入可运行状态。

join方法的作用是将分出来的线程合并回去,等待分出来的线程执行完毕后继续执行原有线程。类似于方法调用。(相当于调用thead.run())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Waits at most {@code millis} milliseconds for this * thread to die. A timeout of {@code 0} means to wait
* forever.
* @param millis
* the time to wait in milliseconds
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
* @throws InterruptedException
* if any thread has interrupted the current thread.
*/
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

// otherThread.join();

对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class JoinExample {
private class A extends Thread {
@Override
public void run() {
System.out.println("A");
}
}

private class B extends Thread {
private A a;
B(A a) {
this.a = a;
}

@Override
public void run() {
try {
a.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B");
}
}

public void test() {
A a = new A();
B b = new B(a);
b.start();
a.start();
}
}

public static void main(String[] args) {
JoinExample example = new JoinExample();
example.test();
}

// A
// B

yield(当前线程.yield)

myThread.yield(),一定是当前线程调用此方法,当前线程放弃获取的cpu时间片,由运行状态变会可运行状态,让OS再次选择线程。

作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。

同时myThread.yield()不会使进程进入阻塞。

1
2
3
4
5
/**
* A hint to the scheduler that the current thread is willing to yield
* its current use of a processor. The scheduler is free to ignore this
*/
public static native void yield();
1
2
3
4
5
6
7
8
9
10
public class TestThread extends Thread{
public void run() {
// ...
yield();
}

public static void main(String[] args){
yield();
}
}

interrupt(其他线程.interrupt)

函数1 interrupt()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Interrupts this thread.
*/
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}

private native void interrupt0();
  • 第11行注释说得很清楚了,interrupt0()方法的作用是”Just to set the interrupt flag”,即方法的作用仅仅是设置中断标识位
  • 第19行就是interrupt0()方法的原型,由于方法是被native修饰的,很明显这是一个本地方法,是Java虚拟机实现的
函数2 isInterrupted()

方法唯一的作用只是测试线程是否已经中断,中断标识位的状态并不受到该方法的影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Tests whether this thread has been interrupted.
* The <i>interrupted status</i> of the thread is unaffected by this method.
* @return <code>true</code> if this thread has been interrupted;
* <code>false</code> otherwise.
*/
public boolean isInterrupted() {
return isInterrupted(false);
}

/**
* Tests if some Thread has been interrupted. The interrupted state
* is reset or not based on the value of ClearInterrupted that is
* passed.
*/
private native boolean isInterrupted(boolean ClearInterrupted);

最终调用的是下面的isInterrupted(boolean ClearInterrupted),这个方法是一个native的,也是Java虚拟机实现的。方法的参数ClearInterrupted,即清除中断标识位,这里传递false,即不清除

函数3 interrupted()

方法的作用是测试当前线程是否已经中断,线程的中断标识位由该方法清除。换句话说,连续两次调用该方法的返回值必定是false。

1
2
3
4
5
6
7
8
9
10
/**
* Tests whether the current thread has been interrupted.
* @return <code>true</code> if the current thread has been interrupted;
* <code>false</code> otherwise.
*/
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

private native boolean isInterrupted(boolean ClearInterrupted);

interrupted()方法调用的是一个native方法,无非这个方法传入的是true,表示清除中断标识位

线程在不同状态下对于中断所产生的反应

NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED(Thread类中有一个State枚举类型列举了线程的所有状态)。

1、NEW和TERMINATED

线程的new状态表示还未调用start方法,还未真正启动。线程的terminated状态表示线程已经运行终止。这两个状态下调用中断方法来中断线程的时候,Java认为毫无意义,所以并不会设置线程的中断标识位,什么事也不会发生。例如:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {

Thread thread = new MyThread();
System.out.println(thread.getState());

thread.interrupt();

System.out.println(thread.isInterrupted());
}
// NEW
// false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws InterruptedException {
Thread thread = new MyThread();
thread.start();
System.out.println(mt.getState());
thread.join();
System.out.println(thread.getState());

thread.interrupt();

System.out.println(thread.isInterrupted());

}
// RUNNABLE
// TERMINATED
// false

中断操作对这两种状态下的线程是无效的。

2、RUNNABLE

如果线程处于运行状态,那么该线程的状态就是RUNNABLE,但是不一定所有处于RUNNABLE状态的线程都能获得CPU运行,在某个时间段,只能由一个线程占用CPU,那么其余的线程虽然状态是RUNNABLE,但是都没有处于运行状态。测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*先定义一个线程类,让其一直占用CPU运行*/
public class MyThread extends Thread{
@Override
public void run(){
while(true){
//do something
}
}
}
/*main函数启动线程*/
public static void main(String[] args) throws InterruptedException {

Thread thread = new MyThread();
thread.start();
System.out.println(thread.getState());

thread.interrupt();
Thread.sleep(1000);//等到thread线程被中断之后
System.out.println(thread.isInterrupted());
System.out.println(thread.getState());
}
// RUNNABLE
// true
// RUNNABLE

我们定义的线程始终循环做一些事情,主线程启动该线程并输出该线程的状态,然后调用中断方法中断该线程并再次输出该线程的状态。

可以看到在我们启动线程之后,线程状态变为RUNNABLE,中断操作后标记中断标志,但是再次输出线程状态的时候,线程仍然处于RUNNABLE状态。

  • 解释:
  • 处于RUNNBALE状态下的线程即便遇到中断操作,也只会设置中断标志位并不会实际中断线程运行。
  • 为什么?
  • 这里其实Java将这种权力交给了我们的程序,Java给我们提供了一个中断标志位,我们的程序可以通过if判断中断标志位是否被设置来中断我们的程序而不是系统强制的中断。例如:
1
2
3
4
5
6
7
8
9
/*修改MyThread类的run方法*/
public void run(){
while(true){
if (Thread.currentThread().isInterrupted()){
System.out.println("exit MyThread");
break;
}
}
}

线程一旦发现自己的中断标志为被设置了,立马跳出死循环。这样的设计好处就在于给了我们程序更大的灵活性。

3、BLOCKED

当线程处于BLOCKED状态说明该线程由于竞争某个对象的锁失败而被挂在了该对象的阻塞队列上了。那么此时发起中断操作不会对该线程产生任何影响,依然只是设置中断标志位。例如:

1
2
3
4
5
6
7
8
9
10
11
12
/*自定义线程类*/
public class MyThread extends Thread{
public synchronized static void doSomething(){
while(true){
//do something
}
}
@Override
public void run(){
doSomething();
}
}

这里我们自定义了一个线程类,run方法中主要就做一件事情,调用一个有锁的静态方法,该方法内部是一个死循环(占用该锁让其他线程阻塞)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws InterruptedException {

Thread thread1 = new MyThread();
thread1.start();

Thread thread2 = new MyThread();
thread2.start();

Thread.sleep(1000);
System.out.println(thread1.getState());
System.out.println(thread2.getState());

thread2.interrupt();
System.out.println(thread2.isInterrupted());
System.out.println(thread2.getState());
}
// RUNNABLE
// BLOCKED
// true
// BLOCKED

在我们的主线程中,我们定义了两个线程并按照定义顺序启动他们,显然thread1启动后便占用MyThread类锁,此后thread2在获取锁的时候一定失败,自然被阻塞在阻塞队列上。

从输出结果看来,thread2处于BLOCKED状态,执行中断操作之后,该线程仍然处于BLOCKED状态,但是中断标志位却已被修改。这种状态下的线程和处于RUNNABLE状态下的线程是类似的,给了我们程序更大的灵活性去判断和处理中断。

4、WAITING/TIMED_WAITING

这两种状态本质上是同一种状态,只不过TIMED_WAITING在等待一段时间后会自动释放自己,而WAITING则是无限期等待,需要其他线程调用notify方法释放自己。但是他们都是线程在运行的过程中由于缺少某些条件而被挂起在某个对象的等待队列上。当这些线程遇到中断操作的时候,会抛出一个InterruptedException异常,并清空中断标志位。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
/*定义一个线程类*/
public class MyThread extends Thread{
@Override
public void run(){
synchronized (this){
try {
wait();
} catch (InterruptedException e) {
System.out.println("i am waiting but facing interruptexception now");
}
}
}
}

我们定义了一个线程类,其中run方法让当前线程阻塞到条件队列上,并且针对InterruptedException 进行捕获,如果遇到InterruptedException 异常则输出一行信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*main函数启动该线程*/
public static void main(String[] args) throws InterruptedException {

Thread thread = new MyThread();
thread.start();

Thread.sleep(500);
System.out.println(thread.getState());
thread.interrupt();
Thread.sleep(1000);
System.out.println(thread.isInterrupted());
}
// WAITING
// i am waiting but facing interruptexception now
// false

在main线程中我们启动一个MyThread线程,然后对其进行中断操作。

从运行结果看,当前程thread启动之后就被挂起到该线程对象的条件队列上,然后我们调用interrupt方法对该线程进行中断,输出了我们在catch中的输出语句,显然是捕获了InterruptedException异常,接着就看到该线程的中断标志位被清空。

总结

当线程调用中断函数interrupt()后,只是设置中断标志位并没有强制终止线程,对于线程的终止权利依然在程序手中。

对于sleep、wait、notify、join,这些会抛出InterruptedException的方法在遇到interrupt()函数执行时,会执行捕获InterruptedException异常的处理方式,完成后及那个中断标识位清除。

这些方法之所以会抛出InterruptedException就是由于Java虚拟机在实现这些方法的时候,本身就有某种机制在判断中断标识位,如果中断了,就抛出一个InterruptedException。所以,可以认为:

interrupt一个其他线程thread时

  1. 如果线程thread中调用了可以抛出InterruptedException的方法,那么会在thread中抛出InterruptedException并清除中断标志位。
  2. 如果thread没有调用此类方法,那么会正常地将设置中断标志位。

具体来看:

  • 如果线程堵塞在object.wait、Thread.join和Thread.sleep,将会清除线程的中断状态,并抛出InterruptedException;

  • 如果线程堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel将会被关闭,线程被置为中断状态,并抛出java.nio.channels.ClosedByInterruptException;

  • 如果线程堵塞在java.nio.channels.Selector上,线程被置为中断状态,select方法会马上返回,类似调用wakeup的效果;

  • 如果不是以上三种情况,thread.interrupt()方法仅仅是设置线程的中断状态为true。

如何停止线程?

  • 使用标志位终止线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TestThread extends Thread {

//volatile修饰符用来保证其它线程读取的总是该变量的最新的值
public volatile boolean exit = false;

@Override
public void run() {
ServerSocket serverSocket = new ServerSocket(8080);
while(!exit){
serverSocket.accept(); //阻塞等待客户端消息
...
}
}

public static void main(String[] args) {
TestThread t = new TestThread();
t.start();
...
t.exit = true; //修改标志位,退出线程
}
}
  • 使用 interrupt() 中断线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class InterruptThread extends Thread{

public static void main(String[] args) {
try {
InterruptThread t = new InterruptThread();
t.start();
Thread.sleep(200);
t.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void run() {
for(int i = 0; i <= 200000; i++) {
//判断是否被中断
if(Thread.currentThread().isInterrupted()){
//处理中断逻辑
break;
}
System.out.println("i=" + i);
}
}
}

注意:如果线程中有阻塞操作,在阻塞时是无法去检测中断标志位或自定义标志位的,只能使用interrupt()方法才能中断线程,并且在线程停止前关闭引起阻塞的资源(比如Socket)。

objects线程方法

wait(对象.wait)

1
public final native void wait(long timeout) throws InterruptedException;
  • 调用obj的wait()notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj)代码段内。
  • obj.wait(),当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。依靠notify()/notifyAll()唤醒或者wait(long timeout),timeout时间到自动唤醒。

notify(对象.notify)

1
2
public final native void notify();
public final native void notifyAll();

obj.notify()唤醒在此对象监视器上等待的单个线程,选择是任意性的。notifyAll()唤醒在此对象监视器上等待的所有线程。

wait&notify 最佳实践

等待方(消费者)和通知方(生产者)

1
2
3
4
5
6
7
8
9
10
11
12
13
等待方:
synchronized(obj){
while(条件不满足){
obj.wait();
}
消费;
}

通知方:
synchonized(obj){
改变条件;
obj.notifyAll();
}
  • notifynotifyAll: 由于多个线程可以基于不同的条件在同一个条件队列上等待,因此如果使用notify而不是notifyAll,那么将是一种危险的操作,因为单一的通知很容易导致类似于信号地址(线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词)的问题。

只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:

  • 所有等待线程的类型都相同,且每个线程在从wait返回后将执行相同的操作。
  • 单进单出:在对象状态上的每次改变,最多只能唤醒一个线程来执行。

它们都属于 Object 的一部分,而不属于 Thread。

只能用在同步方法或者同步控制块中使用!否则会在运行时抛出 IllegalMonitorStateExeception。

使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class WaitNotifyExample {
public synchronized void before() {
System.out.println("before");
notifyAll();
}

public synchronized void after() {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after");
}
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
WaitNotifyExample example = new WaitNotifyExample();
executorService.execute(() -> example.after());
executorService.execute(() -> example.before());
}

// before
// after

互斥同步

Java 提供了两种锁机制来控制多个线程对共享资源的互斥访问,第一个是 JVM 实现的 synchronized,而另一个是 JDK 实现的 ReentrantLock。

synchronized

  • synchronized是一种互斥锁

    • 一次只能允许一个线程进入被锁住的代码块
  • synchronized是一种内置锁/监视器锁

    • Java中每个对象都有一个内置锁(监视器,也可以理解成锁标记),而synchronized就是使用对象的内置锁(监视器)来将代码块(方法)锁定的!
  • synchronized保证了线程的原子性。(被保护的代码块是一次被执行的,没有任何线程会同时访问)

  • synchronized还保证了可见性。(当执行完synchronized之后,修改后的变量对其他的线程是可见的)

总的来说,Java中的synchronized,通过使用内置锁,来实现对变量的同步操作,进而实现了对变量操作的原子性和其他线程对变量的可见性,从而确保了并发情况下的线程安全。

1. 同步一个代码块
1
2
3
4
5
public void func() {
synchronized (this) {
// ...
}
}

它只作用于同一个对象,如果调用两个对象上的同步代码块,就不会进行同步。

对于以下代码,使用 ExecutorService 执行了两个线程,由于调用的是同一个对象的同步代码块,因此这两个线程会进行同步,当一个线程进入同步语句块时,另一个线程就必须等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class SynchronizedExample {

public void func1() {
synchronized (this) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}

public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e1.func1());
}

0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

对于以下代码,两个线程调用了不同对象的同步代码块,因此这两个线程就不需要同步。从输出结果可以看出,两个线程交叉执行。

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func1());
executorService.execute(() -> e2.func1());
}

0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
2. 同步一个方法
1
2
3
public synchronized void func () {
// ...
}

它和同步代码块一样,作用于同一个对象。

3. 同步一个类
1
2
3
4
5
public void func() {
synchronized (SynchronizedExample.class) {
// ...
}
}

作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SynchronizedExample {

public void func2() {
synchronized (SynchronizedExample.class) {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
}
}
}

public static void main(String[] args) {
SynchronizedExample e1 = new SynchronizedExample();
SynchronizedExample e2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> e1.func2());
executorService.execute(() -> e2.func2());
}

0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
4. 同步一个静态方法
1
2
3
public synchronized static void fun() {
// ...
}

作用于整个类。

synchronized锁的释放
  1. 当方法(代码块)执行完毕后会自动释放锁,不需要做任何的操作。
  2. 当一个线程执行的代码出现异常时,其所持有的锁会自动释放
    • 不会由于异常导致出现死锁现象~

ReentrantLock

ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class LockExample {

private Lock lock = new ReentrantLock();

public void func() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
}
} finally {
lock.unlock(); // 确保释放锁,从而避免发生死锁。
}
}
}

public static void main(String[] args) {
LockExample lockExample = new LockExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> lockExample.func());
executorService.execute(() -> lockExample.func());
}

0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

相比于 synchronized,它多了以下高级功能:

  1. 等待可中断

当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。

  1. 可实现公平锁

公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。

synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但可以通过带布尔值的构造函数要求使用公平锁。

  1. 锁绑定多个条件

一个 ReentrantLock 对象可以同时绑定多个 Condition 对象。

比较

1. 锁的实现

synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。

2. 性能

新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。

3. 等待可中断

当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。

ReentrantLock 可中断,而 synchronized 不行。

4. 公平锁

公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。

synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。

5. 锁绑定多个条件

一个 ReentrantLock 可以同时绑定多个 Condition 对象。

使用选择

除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

非阻塞同步

互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

这里称为乐观锁:

乐观锁常见的两种实现方式

乐观锁一般会使用版本号机制或CAS算法实现。

1. 版本号机制

一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。

举一个简单的例子: 假设数据库中帐户信息表中有一个 version 字段,当前值为 1 ;而当前帐户余额字段( balance )为 $100 。

  1. 操作员 A 此时将其读出( version=1 ),并从其帐户余额中扣除 $50( $100-$50 )。
  2. 在操作员 A 操作的过程中,操作员B 也读入此用户信息( version=1 ),并从其帐户余额中扣除 $20 ( $100-$20 )。
  3. 操作员 A 完成了修改工作,将数据版本号加一( version=2 ),连同帐户扣除后余额( balance=$50 ),提交至数据库更新,此时由于提交数据版本大于数据库记录当前版本,数据被更新,数据库记录 version 更新为 2 。
  4. 操作员 B 完成了操作,也将版本号加一( version=2 )试图向数据库提交数据( balance=$80 ),但此时比对数据库记录版本时发现,操作员 B 提交的数据版本号为 2 ,数据库记录当前版本也为 2 ,不满足 “ 提交版本必须大于记录当前版本才能执行更新 “ 的乐观锁策略,因此,操作员 B 的提交被驳回。

这样,就避免了操作员 B 用基于 version=1 的旧数据修改的结果覆盖操作员A 的操作结果的可能。

2. CAS算法

compare and swap(比较与交换),是一种有名的无锁算法。无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization)。CAS算法涉及到三个操作数

  • 需要读写的内存值 V
  • 进行比较的值 A
  • 拟写入的新值 B

当且仅当 V 的值等于 A时,CAS通过原子方式用新值B来更新V的值,否则不会执行任何操作(比较和替换是一个原子操作)。一般情况下是一个自旋操作,即不断的重试

关于自旋锁,大家可以看一下这篇文章,非常不错:《 面试必备之深入理解自旋锁》

乐观锁的缺点

ABA 问题是乐观锁一个常见的问题

1 ABA 问题

如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然是A值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回A,那CAS操作就会误认为它从来没有被修改过。这个问题被称为CAS操作的 “ABA”问题。

JDK 1.5 以后的 AtomicStampedReference 类就提供了此种能力,其中的 compareAndSet 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

2 循环时间长开销大

自旋CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。 如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。

3 只能保证一个共享变量的原子操作

CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。但是从 JDK 1.5开始,提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作.所以我们可以使用锁或者利用AtomicReference类把多个共享变量合并成一个共享变量来操作。

参考链接:悲观锁乐观锁

线程池

非阻塞队列:当队列中满了时候,放入数据,数据丢失

阻塞队列:当队列满了的时候,进行等待,什么时候队列中有出队的数据,那么第11个再放进去

出队

非阻塞队列:如果现在队列中没有元素,取元素,得到的是null

阻塞队列:等待,什么时候放进去,再取出来

线程池使用的是阻塞队列

线程池概念

线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配、调优和监控,有以下好处:

  1. 降低资源消耗;
  2. 提高响应速度;
  3. 提高线程的可管理性。

Java1.5 中引入的 Executor 框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行、被哪个线程执行,以及什么时候执行。

Executor类图

img

线程池工作原理

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果阻塞队列满了,那就创建新的线程执行当前任务;直到线程池中的线程数达到 maxPoolSize,这时再有任务来,只能执行 reject() 处理该任务。

初始化线程池

  • newFixedThreadPool() 说明:初始化一个指定线程数的线程池,其中 corePoolSize == maxiPoolSize,使用 LinkedBlockingQuene 作为阻塞队列 特点:即使当线程池没有可执行任务时,也不会释放线程。
  • newCachedThreadPool() 说明:初始化一个可以缓存线程的线程池,默认缓存60s,线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列; 特点:在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源;当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销; 因此,使用时要注意控制并发的任务数,防止因创建大量的线程导致而降低性能。
  • newSingleThreadExecutor() 说明:初始化只有一个线程的线程池,内部使用 LinkedBlockingQueue 作为阻塞队列。 特点:如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行
  • newScheduledThreadPool() 特点:初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据。

初始化方法

1
2
3
4
5
6
7
// 使用Executors静态方法进行初始化
ExecutorService service = Executors.newSingleThreadExecutor();
// 常用方法
service.execute(new Thread());
service.submit(new Thread());
service.shutDown();
service.shutDownNow();

常用方法

execute与submit

  1. 接收的参数不一样

  2. submit有返回值,而execute没有

    用到返回值的例子,比如说我有很多个做 validation 的 task,我希望所有的 task 执行完,然后每个 task 告诉我它的执行结果,是成功还是失败,如果是失败,原因是什么。然后我就可以把所有失败的原因综合起来发给调用者。

  3. submit方便Exception处理

    如果你在你的 task 里会抛出 checked 或者 unchecked exception,而你又希望外面的调用者能够感知这些 exception 并做出及时的处理,那么就需要用到 submit,通过捕获 Future.get 抛出的异常。

shutDown与shutDownNow

当线程池调用该方法时,线程池的状态则立刻变成 SHUTDOWN 状态。此时,则不能再往线程池中添加任何任务,否则将会抛出 RejectedExecutionException 异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。

内部实现

并发队列

  • 入队
    • 非阻塞队列:当队列中满了时候,放入数据,数据丢失
    • 阻塞队列:当队列满了的时候,进行等待,什么时候队列中有出队的数据,那么第11个再放进去
  • 出队
    • 非阻塞队列:如果现在队列中没有元素,取元素,得到的是null
    • 阻塞队列:等待,什么时候放进去,再取出来

线程池使用的是阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 线程存活时间(在 corePore<*<maxPoolSize 情况下有用)
TimeUnit unit, // 存活时间的时间单位
BlockingQueue<Runnable> workQueue // 阻塞队列(用来保存等待被执行的任务)
ThreadFactory threadFactory, // 线程工厂,主要用来创建线程;
RejectedExecutionHandler handler // 当拒绝处理任务时的策略
){

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

关于 workQueue 参数,有四种队列可供选择:

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按 FIFO 排序任务;
  • LinkedBlockingQuene:基于链表结构的阻塞队列,按 FIFO 排序任务;
  • SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 ArrayBlockingQuene;
  • PriorityBlockingQuene:具有优先级的无界阻塞队列;

关于 handler 参数,线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

线程池的状态

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));Click to copy

其中 AtomicInteger 变量 ctl 的功能非常强大:利用低 29 位表示线程池中线程数,通过高 3 位表示线程池的运行状态:

  • RUNNING:-1 << COUNT_BITS,即高 3 位为 111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • SHUTDOWN: 0 << COUNT_BITS,即高 3 位为 000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • STOP : 1 << COUNT_BITS,即高 3 位为 001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • TIDYING : 2 << COUNT_BITS,即高 3 位为 010,该状态表示线程池对线程进行整理优化;
  • TERMINATED: 3 << COUNT_BITS,即高 3 位为 011,该状态表示线程池停止工作;

为什么引入Executor线程池

new Thread() 的缺点

  • 每次 new Thread() 耗费性能
  • 调用 new Thread() 创建的线程缺乏管理,被称为野线程,而且可以无限制创建,之间相互竞争,会导致过多占用系统资源导致系统瘫痪。
  • 不利于扩展,比如如定时执行、定期执行、线程中断

采用线程池的优点

  • 重用存在的线程,减少对象创建、消亡的开销,性能佳
  • 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞
  • 提供定时执行、定期执行、单线程、并发数控制等功能

Executor 中断

调用 Executor 的 shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。

以下使用 Lambda 创建线程,相当于创建了一个匿名内部线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
Thread.sleep(2000);
System.out.println("Thread run");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdownNow();
System.out.println("Main run");
}
1
2
3
4
5
6
7
8
Main run
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at ExecutorInterruptExample.lambda$main$0(ExecutorInterruptExample.java:9)
at ExecutorInterruptExample$$Lambda$1/1160460865.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)Copy to clipboardErrorCopied

如果只想中断 Executor 中的一个线程,可以通过使用 submit() 方法来提交一个线程,它会返回一个 Future<?> 对象,通过调用该对象的 cancel(true) 方法就可以中断线程。

1
2
3
4
Future<?> future = executorService.submit(() -> {
// ..
});
future.cancel(true);

线程间通信

  • synchronized 同步
  • while 轮询的方式
  • join():是当前线程进入WAIT状态,但不释放对象锁,直到执行该方法线程执行完毕或者millis时间到,原先线程再进入可运行状态。
  • wait():使一个线程处于等待状态,并且释放所持有的对象的 lock。
  • sleep():使一个 正在运行的线程处于睡眠状态,是一个静态方法,调用此方法要捕捉 InterruptedException 异常。
  • notify():唤醒一个处于等待状态的线程,注意的是在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程,而是由 JVM 确定唤醒哪个线程,而且不是按优先级。
  • notityAll():唤醒所有处入等待状态的线程,注意并不是给所有唤醒线程一个对象的锁,而是让它们竞争。
  • CountdownLatch 类:用来控制一个或者多个线程等待多个线程。
  • CyclicBarrier 类:用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行。
  • Semaphore 类:似于操作系统中的信号量,可以控制对互斥资源的访问线程数。
  • Callable 接口:有返回值

如何让两个线程依次执行

假设有两个线程,一个是线程 A,另一个是线程 B,两个线程分别依次打印 1-3 三个数字即可。我们来看下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void demo1() {    
Thread A = new Thread(new Runnable() {
@Override
public void run() {
printNumber("A");
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
printNumber("B");
}
});
A.start();
B.start();
}
// 结果
乱序同时打印

其中的 printNumber(String) 实现如下,用来依次打印 1, 2, 3 三个数字:

1
2
3
4
5
6
7
8
9
10
11
12
private static void printNumber(String threadName) {    
int i=0;
while (i++ < 3) {
/**
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println(threadName + " print: " + i);
}
}

结果两者是同时打印的,目标:让A打印完后B再打印,方法:是使用 join 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static void demo2() {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
printNumber("A");
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("B 开始等待 A");
try {
A.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

printNumber("B");
}
});
B.start();
A.start();
}
// 结果
B 开始等待 A
A print: 1
A print: 2
A print: 3

B print: 1
B print: 2
B print: 3

A.join 把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。比如在线程B中调用了线程A的 join() 方法,直到线程A执行完毕后,才会继续执行线程B。

  • t.join(); 调用 join 方法,等待线程 t 执行完毕
  • t.join(1000); 等待 t 线程,等待时间是1000毫秒。

所以能看到 A.join() 方法会让 B 一直等待直到 A 运行完毕。

如何让两个线程有序交叉运行

还是上面那个例子,我现在希望 A 在打印完 1 后,再让 B 打印 1, 2, 3,最后再回到 A 继续打印 2, 3。这种需求下,显然 Thread.join() 已经不能满足了。我们需要更细粒度的锁来控制执行顺序。

这里,我们可以利用 object.wait()object.notify() 两个方法来实现。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* A 1, B 1, B 2, B 3, A 2, A 3
*/
private static void demo3() {
Object lock = new Object();
Thread A = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("A 1");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A 2");
System.out.println("A 3");
}
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
System.out.println("B 1");
System.out.println("B 2");
System.out.println("B 3");
lock.notify();
}
}
});
A.start();
B.start();
}
1
2
3
4
5
首先创建一个 A 和 B 共享的对象锁 lock = new Object();
当 A 得到锁后,先打印 1,然后调用 lock.wait() 方法,交出锁的控制权,进入 wait 状态;
对 B 而言,由于 A 最开始得到了锁,导致 B 无法执行;直到 A 调用 lock.wait() 释放控制权后, B 才得到了锁;
B 在得到锁后打印 1, 2, 3;然后调用 lock.notify() 方法,唤醒正在 wait 的 A;
A 被唤醒后,继续打印剩下的 2,3。

计数器(run D After A B C)

A B C D四个线程,D 等其他全执行完毕再执行,且 A B C 同步运行

目的是:A B C 三个线程同时运行,各自独立运行完后通知 D;对 D 而言,只要 A B C 都运行完了,D 再开始运行。针对这种情况,我们可以利用 CountdownLatch 来实现这类通信方式。它的基本用法是:

  1. 创建一个计数器,设置初始值,CountdownLatch countDownLatch = new CountDownLatch(2);
  2. 等待线程里调用 countDownLatch.await() 方法,进入等待状态,直到计数值变成 0;
  3. 其他线程里,调用 countDownLatch.countDown() 方法,该方法会将计数值减小 1;
  4. 其他线程countDown() 方法把计数值变成 0 时,等待线程 里的 countDownLatch.await() 立即退出,继续执行后面的代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private static void runDAfterABC() {
int worker = 3;
CountDownLatch countDownLatch = new CountDownLatch(worker);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("D is waiting for other three threads");
try {
countDownLatch.await();
System.out.println("All done, D starts working");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
for (char threadName='A'; threadName <= 'C'; threadName++) {
final String tN = String.valueOf(threadName);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(tN + " is working");
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(tN + " finished");
countDownLatch.countDown();
}
}).start();
}
}
// 执行结果
D is waiting for other three threads
A is working
B is working
C is working
B finished
A finished
C finished
All done, D starts working

其实简单点来说,CountDownLatch 就是一个倒计数器,我们把初始计数值设置为3,当 D 运行时,先调用 countDownLatch.await() 检查计数器值是否为 0,若不为 0 则保持等待状态;当A B C 各自运行完后都会利用countDownLatch.countDown(),将倒计数器减 1,当三个都运行完后,计数器被减至 0;此时立即触发 Dawait() 运行结束,继续向下执行。

因此,CountDownLatch 适用于一个线程去等待多个线程的情况。

栅栏(run ABC When All Ready)

上面的 CountDownLatch 可以用来倒计数,但当计数完毕,只有一个线程的 await() 会得到响应,无法让多个线程同时触发。

为了实现线程间互相等待这种需求,我们可以利用 CyclicBarrier 数据结构,它的基本用法是:

  1. 先创建一个公共 CyclicBarrier 对象,设置 同时等待的线程数,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  2. 这些线程同时开始自己做准备,自身准备完毕后,需要等待别人准备完毕,这时调用 cyclicBarrier.await(); 即可开始等待别人;
  3. 当指定的 同时等待的线程数都调用了 cyclicBarrier.await();时,意味着这些线程都准备完毕好,然后这些线程才 同时继续执行

实现代码如下,设想有三个跑步运动员,各自准备好后等待其他人,全部准备好后才开始跑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private static void runABCWhenAllReady() {
int runner = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
final Random random = new Random();
for (char runnerName='A'; runnerName <= 'C'; runnerName++) {
final String rN = String.valueOf(runnerName);
new Thread(new Runnable() {
@Override
public void run() {
long prepareTime = random.nextInt(10000) + 100;
System.out.println(rN + " is preparing for time: " + prepareTime);
try {
Thread.sleep(prepareTime);
} catch (Exception e) {
e.printStackTrace();
}
try {
System.out.println(rN + " is prepared, waiting for others");
cyclicBarrier.await(); // 当前运动员准备完毕,等待别人准备好
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(rN + " starts running"); // 所有运动员都准备好了,一起开始跑
}
}).start();
}
}
// 执行结果
A is preparing for time: 1177
C is preparing for time: 5944
B is preparing for time: 1196
A is prepared, waiting for others
B is prepared, waiting for others
C is prepared, waiting for others
C starts running
A starts running
B starts running

信号量

Semaphore 就是操作系统中的信号量,可以控制对互斥资源的访问线程数。Semaphore 可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

假若一个工厂有 5 台机器,但是有 8 个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过 Semaphore 来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Test {
public static void main(String[] args) {
int N = 8; //工人数
Semaphore semaphore = new Semaphore(5); //机器数目
for (int i = 0; i < N; i++) new Worker(i, semaphore).start();
}

static class Worker extends Thread {
private int num;
private Semaphore semaphore;

public Worker(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}

@Override
public void run() {
try {
semaphore.acquire();
System.out.println("工人" + this.num + "占用一个机器在生产...");
Thread.sleep(2000);
System.out.println("工人" + this.num + "释放出机器");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人5占用一个机器在生产...
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产...
工人7占用一个机器在生产...
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产...
工人3释放出机器
工人7释放出机器
工人6释放出机器

返回值通知(do Task With Result In Worker)

子线程完成某件任务后,把结果回传给主线程。回顾线程的创建,我们一般会把 Runnable 对象传给 Thread 去执行。Runnable定义如下:

1
2
3
public interface Runnable {    
public abstract void run();
}

可以看到 run() 在执行完后不会返回任何结果。那如果希望返回结果呢?这里可以利用另一个类似的接口类 Callable

1
2
3
4
5
6
7
8
9
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

可以看出 Callable 最大区别就是返回范型 V 结果。

那么下一个问题就是,如何把子线程的结果回传回来呢?在 Java 里,有一个类是配合 Callable 使用的:FutureTask,不过注意,它获取结果的 get 方法会阻塞主线程。

举例,我们想让子线程去计算从 1 加到 100,并把算出的结果返回到主线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static void doTaskWithResultInWorker() {
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Task starts");
Thread.sleep(1000);
int result = 0;
for (int i=0; i<=100; i++) {
result += i;
}
System.out.println("Task finished and return result");
return result;
}
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
try {
System.out.println("Before futureTask.get()");
System.out.println("Result: " + futureTask.get());
System.out.println("After futureTask.get()");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 执行结果
Before futureTask.get()

Task starts
Task finished and return result

Result: 5050
After futureTask.get()

可以看到,主线程调用 futureTask.get() 方法时阻塞主线程;然后 Callable 内部开始执行,并返回运算结果;此时 futureTask.get() 得到结果,主线程恢复运行。

这里我们可以学到,通过 FutureTaskCallable 可以直接在主线程获得子线程的运算结果,只不过需要阻塞主线程。当然,如果不希望阻塞主线程,可以考虑利用 ExecutorService,把 FutureTask 放到线程池去管理执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private static void doTaskWithResultInWorker() {
//创建线程池
ExecutorService executor = Executors.newCachedThreadPool();

Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Task starts");
Thread.sleep(1000);
int result = 0;
for (int i=0; i<=100; i++) {
result += i;
}
System.out.println("Task finished and return result");
return result;
}
};

FutureTask<Integer> futureTask = new FutureTask<Integer>(callable) {
@Override
protected void done() {
System.out.println("Before futureTask.get(). ThreadName: " + Thread.currentThread().getName());
try {
System.out.println("Result: " + get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("After futureTask.get()");
}
};
executor.submit(futureTask);
executor.shutdown();
System.out.println("End of main thread. ThreadName: " + Thread.currentThread().getName());
}
// 执行结果
Task starts
End of main thread. ThreadName: main
Task finished and return result
Before futureTask.get(). ThreadName: pool-1-thread-1
Result: 5050
After futureTask.get()

Java内存模型

基本概念

  • JMM(java内存模型) 本身是一种抽象的概念并不是真实存在,它描述的是一组规定或则规范,通过这组规范定义了程序中的访问方式。
  • JMM 同步规定
    • 线程解锁前,必须把共享变量的值刷新回主内存
    • 线程加锁前,必须读取主内存的最新值到自己的工作内存
    • 加锁解锁是同一把锁

主内存与工作内存

img

所有的变量都存储在主内存中,每个线程还有自己的工作内存,工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存副本拷贝。

处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存。

加入高速缓存带来了一个新的问题:缓存一致性。如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题。

线程只能直接操作工作内存中的变量,不同线程之间的变量值传递需要通过主内存来完成。

Java内存模型抽象结构图

img

内存指令

Java 内存模型定义了 8 个操作来完成主内存和工作内存的交互操作。

img

  • read:把一个变量的值从主内存传输到工作内存中
  • load:在 read 之后执行,把 read 得到的值放入工作内存的变量副本中
  • use:把工作内存中一个变量的值传递给执行引擎
  • assign:把一个从执行引擎接收到的值赋给工作内存的变量
  • store:把工作内存的一个变量的值传送到主内存中
  • write:在 store 之后执行,把 store 得到的值放入主内存的变量中
  • lock:作用于主内存的变量,把一个变量标识为一条线程独占状态
  • unlock:作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定

如果要把一个变量从主内存中复制到工作内存,就需要按顺寻地执行 read 和 load 操作,如果把变量从工作内存中同步回主内存中,就要按顺序地执行 store 和 write 操作。Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。也就是 read 和 load 之间,store 和 write 之间是可以插入其他指令的,如对主内存中的变量a、b进行访问时,可能的顺序是read a,read b,load b, load a。

内存模型三大特性

1. 原子性

Java 内存模型保证了 read、load、use、assign、store、write、lock 和 unlock 操作具有原子性,例如对一个 int 类型的变量执行 assign 赋值操作,这个操作就是原子性的。但是 Java 内存模型允许虚拟机将没有被 volatile 修饰的 64 位数据(long,double)的读写操作划分为两次 32 位的操作来进行,即 load、store、read 和 write 操作可以不具备原子性。

2. 可见性

可见性指当一个线程修改了共享变量的值,其它线程能够立即得知这个修改。Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。

主要有三种实现可见性的方式:

  • volatile
  • synchronized,对一个变量执行 unlock 操作之前,必须把变量值同步回主内存。
  • final,被 final 关键字修饰的字段在构造器中一旦初始化完成,并且没有发生 this 逃逸(其它线程通过 this 引用访问到初始化了一半的对象),那么其它线程就能看见 final 字段的值。

对前面的线程不安全示例中的 cnt 变量使用 volatile 修饰,不能解决线程不安全问题,因为 volatile 并不能保证操作的原子性。

3. 有序性

有序性是指:在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序。在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。

volatile 关键字通过添加内存屏障的方式来禁止指令重排,即重排序时不能把后面的指令放到内存屏障之前。

也可以通过 synchronized 来保证有序性,它保证每个时刻只有一个线程执行同步代码,相当于是让线程顺序执行同步代码。

交互操作

  • JVM 运行程序的实体是线程,
  • 工作内存是每个线程的私有数据区域,
  • 主内存是所有变量的储存区域,所有的线程都可以访问,
  • 线程对变量的操作(读取赋值等)必须都工作内存进行。

首先要将变量从主内存拷贝的自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,工作内存中存储着主内存中的变量副本拷贝,前面说过,工作内存是每个线程的私有数据区域,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。

概述

  • 公平锁/非公平锁
  • 可重入锁
  • 独享锁/共享锁
  • 互斥锁/读写锁
  • 乐观锁/悲观锁
  • 偏向锁/轻量级锁/重量级锁
  • 自旋锁

公平锁/非公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁。
非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。
对于Java ReentrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。
对于Synchronized而言,也是一种非公平锁。由于其并不像ReentrantLock是通过AQS的来实现线程调度,所以并没有任何办法使其变成公平锁。

可重入锁/不可重入锁

基础知识

Java多线程的 wait() 方法和 notify() 方法。
这两个方法是成对出现和使用的,要执行这两个方法,有一个前提就是,当前线程必须获其对象的monitor(俗称“锁”),否则会抛出 IllegalMonitorStateException 异常,所以这两个方法必须在同步块代码里面调用。

  • wait():阻塞当前线程
  • notify():唤起被wait()阻塞的线程

不可重入锁

所谓不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。我们尝试设计一个不可重入锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Lock{
private boolean isLocked = false;
public synchronized void lock() throws InterruptedException {
while(isLocked){
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify();
}
}

使用该锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Count{
Lock lock = new Lock();
public void print(){
lock.lock();
doAdd();
lock.unlock();
}
public void doAdd(){
lock.lock();
//do something
lock.unlock();
}
}

当前线程执行print()方法首先获取lock,接下来执行doAdd()方法就无法执行doAdd()中的逻辑,必须先释放锁。这个例子很好的说明了不可重入锁。

可重入锁

接下来,我们设计一种可重入锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Lock{
boolean isLocked = false;
Thread lockedBy = null;
int lockedCount = 0;
public synchronized void lock() throws InterruptedException{
Thread thread = Thread.currentThread();
while(isLocked && lockedBy != thread){
wait();
}
isLocked = true;
lockedCount++;
lockedBy = thread;
}
public synchronized void unlock(){
if(Thread.currentThread() == this.lockedBy){
lockedCount--;
if(lockedCount == 0){
isLocked = false;
notify();
}
}
}
}

所谓可重入,意味着线程可以进入它已经拥有的锁的同步代码块儿。

我们设计两个线程调用 print() 方法,第一个线程调用 print() 方法获取锁,进入 lock() 方法,由于初始 lockedBy 是 null,所以不会进入 while 而挂起当前线程,而是是增量 lockedCount 并记录 lockBy 为第一个线程。接着第一个线程进入 doAdd() 方法,由于同一进程,所以不会进入 while 而挂起,接着增量 lockedCount,当第二个线程尝试lock,由于 isLocked=true,所以他不会获取该锁,直到第一个线程调用两次 unlock() 将 lockCount 递减为0,才将标记为 isLocked 设置为 false。

可重入锁的概念和设计思想大体如此,Java 中的可重入锁 ReentrantLock 设计思路也是这样。

独享锁/共享锁

独享锁是指该锁一次只能被一个线程所持有。
共享锁是指该锁可被多个线程所持有。

对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是共享锁,其写锁是独享锁。

  • 读锁的共享锁可保证并发读是非常高效的,读写,写读 ,写写的过程是互斥的。
  • 独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。

对于Synchronized而言,当然是独享锁。

互斥锁/读写锁

上面讲的独享锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现。

  • 互斥锁在Java中的具体实现就是ReentrantLock
  • 读写锁在Java中的具体实现就是ReadWriteLock

乐观锁/悲观锁

乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度。

悲观锁认为对于同一个数据的并发操作,一定是会发生修改的,哪怕没有修改,也会认为修改。因此对于同一个数据的并发操作,悲观锁采取加锁的形式。悲观的认为,不加锁的并发操作一定会出问题。

乐观锁则认为对于同一个数据的并发操作,是不会发生修改的。在更新数据的时候,会采用尝试更新,不断重新的方式更新数据。乐观的认为,不加锁的并发操作是没有事情的。

从上面的描述我们可以看出,悲观锁适合写操作非常多的场景,乐观锁适合读操作非常多的场景,不加锁会带来大量的性能提升。

  • 悲观锁在Java中的使用,就是利用各种锁。
  • 乐观锁在Java中的使用,是无锁编程,常常采用的是CAS算法,典型的例子就是原子类,通过CAS自旋实现原子操作的更新。

img

CAS的原理

CAS全称 Compare And Swap(比较与交换),是一种无锁算法。在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。java.util.concurrent包中的原子类就是通过CAS来实现了乐观锁。

CAS算法涉及到三个操作数:

  • 需要读写的内存值 V。
  • 进行比较的值 A。
  • 要写入的新值 B。

当且仅当 V 的值等于 A 时,CAS通过原子方式用新值B来更新V的值(“比较+更新”整体是一个原子操作),否则不会执行任何操作。一般情况下,“更新”是一个不断重试的操作。

整个“比较+更新”操作封装在compareAndSwapInt()中。通过CPU的cmpxchg指令,去比较寄存器中的 A 和 内存中的值 V。如果相等,就把要写入的新值 B 存入内存中。如果不相等,就将内存值 V 赋值给寄存器中的值 A。

问题:

问题 问题原理 解决方法
ABA 内存值由A,变为B,又变成A,那么CAS检查时会发现值没有变化,但实际上是有变化的。 在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从“A-B-A”变成了“1A-2B-3A”。
循环时间长开销大 长时间不成功,会导致其一直自旋 类似自适应自旋
只能保证一个共享变量的原子操作 CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的 AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作

自旋锁

首先,内核态与用户态的切换上不容易优化。但通过自旋锁,可以减少线程阻塞造成的线程切换(包括挂起线程和恢复线程)。

img

如果锁的粒度小,那么锁的持有时间比较短(尽管具体的持有时间无法得知,但可以认为,通常有一部分锁能满足上述性质)。那么,对于竞争这些锁的而言,因为锁阻塞造成线程切换的时间与锁持有的时间相当,减少线程阻塞造成的线程切换,能得到较大的性能提升。具体如下:

  • 当前线程竞争锁失败时,打算阻塞自己
  • 不直接阻塞自己,而是自旋(空等待,比如一个空的有限for循环)一会
  • 在自旋的同时重新竞争锁
  • 如果自旋结束前获得了锁,那么锁获取成功;否则,自旋结束后阻塞自己

如果在自旋的时间内,锁就被旧owner释放了,那么当前线程就不需要阻塞自己(也不需要在未来锁释放时恢复),减少了一次线程切换。

“锁的持有时间比较短“这一条件可以放宽。实际上,只要锁竞争的时间比较短(比如线程1快释放锁的时候,线程2才会来竞争锁),就能够提高自旋获得锁的概率。这通常发生在锁持有时间长,但竞争不激烈的场景中。

自旋锁的实现原理同样也是CAS,AtomicInteger中调用unsafe进行自增操作的源码中的do-while循环就是一个自旋操作,如果修改数值失败则通过循环来执行自旋,直至修改成功。

1568776461627

典型的自旋锁实现的例子,可以参考自旋锁的实现

无锁/偏向锁/轻量级锁/重量级锁

这四种锁是指锁的状态,专门针对synchronized的。

1568776678399

1568776727351

1568776780005

无锁

无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。

无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。上面我们介绍的CAS原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。

偏向锁

在没有实际竞争的情况下,还能够针对部分场景继续优化。如果不仅仅没有实际竞争,自始至终,使用锁的线程都只有一个,那么,维护轻量级锁都是浪费的。偏向锁的目标是,减少无竞争且只有一个线程使用锁的情况下,使用轻量级锁产生的性能消耗。轻量级锁每次申请、释放锁都至少需要一次CAS,但偏向锁只有初始化时需要一次CAS。

“偏向”的意思是,偏向锁假定将来只有第一个申请锁的线程会使用锁(不会有任何线程再来申请锁),因此,只需要在Mark Word中 CAS 操作记录线程ID(本质上也是更新,但初始值为空),如果记录成功,则偏向锁获取成功,锁状态为偏向锁,以后只检测Mark Word里是否存储着指向当前线程的偏向锁;否则,说明有其他线程竞争,膨胀为轻量级锁。

偏向锁无法使用自旋锁优化,因为一旦有其他线程申请锁,持有偏向锁的线程就会释放锁,就破坏了偏向锁的假定。

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

轻量级锁

是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。

在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,然后拷贝对象头中的Mark Word复制到锁记录中。

拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record里的owner指针指向对象的Mark Word。

如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。

如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。

若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。

重量级锁

内置锁在Java中被抽象为监视器锁(monitor)。在JDK 1.6之前,监视器锁可以认为直接对应底层操作系统中的互斥量(mutex)。这种同步方式的成本非常高,包括系统调用引起的内核态与用户态切换、线程阻塞造成的线程切换等。因此,后来称这种锁为“重量级锁”。

小结

偏向锁通过对比Mark Word解决加锁问题,避免执行CAS操作。而轻量级锁是通过用CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。

偏向锁、轻量级锁、重量级锁适用于不同的并发场景:

  • 偏向锁:无实际竞争,且将来只有第一个申请锁的线程会使用锁。
  • 轻量级锁:无实际竞争,多个线程交替使用锁;允许短时间的锁竞争。
  • 重量级锁:有实际竞争,且锁竞争时间长。

另外,如果锁竞争时间短,可以使用自旋锁进一步优化轻量级锁、重量级锁的性能,减少线程切换。

如果锁竞争程度逐渐提高(缓慢),那么从偏向锁逐步膨胀到重量锁,能够提高系统的整体性能。

总结

  • 自旋锁时属于不可重入锁;ReentrantLocksynchronized 都是可重入锁;
  • synchronized 关键字(其中的重量锁),ReentrantLock,Object.wait()/notify() 都是阻塞锁;
  • 偏向锁、轻量级锁和重量级锁都表示锁的状态。

[锁分配和膨胀过程](http://blog.cuzz.site/2019/02/13/Java 中的锁/4491294-e3bcefb2bacea224.png)

volatile 关键字

是 Java 虚拟机提供的轻量级的同步机制

  • 保证可见性
  • 禁止指令排序
  • 不保证原子性

三大特性

  • 可见性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class VolatileDemo {
public static void main(String[] args) {
Data data = new Data();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " coming...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
data.addOne();
System.out.println(Thread.currentThread().getName() + " updated...");
}).start();

while (data.a == 0) {
// looping
}
System.out.println(Thread.currentThread().getName() + " job is done...");
}
}
class Data {
// int a = 0;
volatile int a = 0;
void addOne() {
this.a += 1;
}
}
// 新线程修改a值后,可通知main线程,即可见性

如果不加 volatile 关键字,则主线程会进入死循环,加 volatile 则主线程能够退出,说明加了 volatile 关键字变量,当有一个线程修改了值,会马上被另一个线程感知到,当前值作废,从新从主内存中获取值。对其他线程可见,这就叫可见性。

  • 原子性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class VolatileDemo {
public static void main(String[] args) {
// test01();
test02();
}
// 测试原子性
private static void test02() {
Data data = new Data();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
data.addOne();
}
}).start();
}
// 默认有 main 线程和 gc 线程
while (Thread.activeCount() > 2) {
Thread.yield();
}
System.out.println(data.a);
}
}
class Data {
volatile int a = 0;
void addOne() {
this.a += 1;
}
}

发现并不能输入 20000

  • 禁止指令排序

volatile 实现禁止指令重排序的优化,从而避免了多线程环境下程序出现乱序的现象

先了解一个概念,内存屏障(Memory Barrier)又称内存栅栏,是一个 CPU 指令,他的作用有两个:

  • 保证特定操作的执行顺序
  • 保证某些变量的内存可见性(利用该特性实现 volatile 的内存可见性)

由于编译器个处理器都能执行指令重排序优化,如果在指令间插入一条 Memory Barrier 则会告诉编译器和 CPU,不管什么指令都不能个这条 Memory Barrier 指令重排序,也就是说通过插入内存屏障禁止在内存屏障前后执行重排序优化。内存屏障另一个作用是强制刷出各种 CPU 缓存数据,因此任何 CPU 上的线程都能读取到这些数据的最新版本。

下面是保守策略下,volatile写插入内存屏障后生成的指令序列示意图:

0e75180bf35c40e2921493d0bf6bd684_th

下面是在保守策略下,volatile读插入内存屏障后生成的指令序列示意图:

21ebc7e8190c4966948c4ef4424088be_th

线程安全性保证

  • 工作内存与主内存同步延迟现象导致可见性问题
    • 可以使用 synchronzied 或 volatile 关键字解决,它们可以使用一个线程修改后的变量立即对其他线程可见
  • 对于指令重排导致可见性问题和有序性问题
    • 可以利用 volatile 关键字解决,因为 volatile 的另一个作用就是禁止指令重排序优化

volatile 使用场景

单例模式
  • 多线程环境下可能存在的安全问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@NotThreadSafe
public class Singleton01 {
private static Singleton01 instance = null;
private Singleton01() {
System.out.println(Thread.currentThread().getName() + " construction...");
}
public static Singleton01 getInstance() {
if (instance == null) {
instance = new Singleton01();
}
return instance;
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(()-> Singleton01.getInstance());
}
executorService.shutdown();
}
}

发现构造器里的内容会多次输出

  • 双重锁单例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Singleton02 {
private static volatile Singleton02 instance = null;
private Singleton02() {
System.out.println(Thread.currentThread().getName() + " construction...");
}
public static Singleton02 getInstance() {
if (instance == null) {
synchronized (Singleton01.class) {
if (instance == null) {
instance = new Singleton02();
}
}
}
return instance;
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.execute(()-> Singleton02.getInstance());
}
executorService.shutdown();
}
}
  • 如果没有加 volatile 就不一定是线程安全的,原因是指令重排序的存在,加入 volatile 可以禁止指令重排。

  • 原因是在于某一个线程执行到第一次检测,读取到的 instance 不为 null 时,instance 的引用对象可能还没有完成初始化。

  • instance = new Singleton() 可以分为以下三步完成

1
2
3
memory = allocate();  // 1.分配对象空间
instance(memory); // 2.初始化对象
instance = memory; // 3.设置instance指向刚分配的内存地址,此时instance != null
  • 步骤 2 和步骤 3 不存在依赖关系,而且无论重排前还是重排后程序的执行结果在单线程中并没有改变,因此这种优化是允许的。

  • 发生重排

1
2
3
memory = allocate();  // 1.分配对象空间
instance = memory; // 3.设置instance指向刚分配的内存地址,此时instance != null,但对象还没有初始化完成
instance(memory); // 2.初始化对象
  • 所以不加 volatile 返回的实例不为空,但可能是未初始化的实例

CAS

  • CAS 的全称 Compare-And-Swap,它是一条 CPU 并发。

  • 它的功能是判断内存某一个位置的值是否为预期,如果是则更改这个值,这个过程就是原子的。

  • CAS 并发原体现在 JAVA 语言中就是 sun.misc.Unsafe 类中的各个方法。调用 UnSafe 类中的 CAS 方法,JVM 会帮我们实现出 CAS 汇编指令。这是一种完全依赖硬件的功能,通过它实现了原子操作。由于 CAS 是一种系统源语,源语属于操作系统用语范畴,是由若干条指令组成,用于完成某一个功能的过程,并且原语的执行必须是连续的,在执行的过程中不允许被中断,也就是说 CAS 是一条原子指令,不会造成所谓的数据不一致的问题。

1
2
3
4
5
6
7
8
9
10
11
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(666);
// 获取真实值(期望值),并替换为相应的值
boolean b = atomicInteger.compareAndSet(666, 2019);
System.out.println(b); // true
boolean b1 = atomicInteger.compareAndSet(666, 2020);
System.out.println(b1); // false
atomicInteger.getAndIncrement();
}
}

原理

getAndIncrement();
1
2
3
4
5
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
//this当前对象
//valueOffset内存偏移量
}

引出一个问题:UnSafe 类是什么?

UnSafe 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
// 获取下面 value 的地址偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

private volatile int value;
// ...
}
  • Unsafe 是 CAS 的核心类,由于 Java 方法无法直接访问底层系统,而需要通过本地(native)方法来访问, Unsafe 类相当一个后门,基于该类可以直接操作特定内存的数据。Unsafe 类存在于 sun.misc 包中,其内部方法操作可以像 C 指针一样直接操作内存,因为 Java 中 CAS 操作执行依赖于 Unsafe 类。
  • 变量 vauleOffset,表示该变量值在内存中的偏移量,因为 Unsafe 就是根据内存偏移量来获取数据的。
  • 变量 value 用 volatile 修饰,保证了多线程之间的内存可见性。

不足

  • 循环时间长开销很大
    • 如果 CAS 失败,会一直尝试,如果 CAS 长时间一直不成功,可能会给 CPU 带来很大的开销(比如线程数很多,每次比较都是失败,就会一直循环),所以希望是线程数比较小的场景。
  • 只能保证一个共享变量的原子操作
    • 对于多个共享变量操作时,循环 CAS 就无法保证操作的原子性。
  • 引出 ABA 问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ABADemo {
private static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);

public static void main(String[] args) {
new Thread(() -> {
atomicReference.compareAndSet(100, 101);
atomicReference.compareAndSet(101, 100);
}).start();

new Thread(() -> {
// 保证上面线程先执行
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicReference.compareAndSet(100, 2019);
System.out.println(atomicReference.get()); // 2019
}).start();
}
}

当有一个值从 A 改为 B 又改为 A,这就是 ABA 问题。

应对方法:时间戳原子引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ABADemo2 {
private static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);

public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + " 的版本号为:" + stamp);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 );
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 );
}).start();

new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + " 的版本号为:" + stamp);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean b = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(b); // false
System.out.println(atomicStampedReference.getReference()); // 100
}).start();
}
}

我们先保证两个线程的初始版本为一致,后面修改是由于版本不一样就会修改失败。

QA

同步和异步

如果数据将在线程间共享。例如正在写的数据以后可能被另一个线程读到,或者正在读 的数据可能已经被另一个线程写过了,那么这些数据就是共享数据,必须进行同步存取。

当应用程序在对象上调用了一个需要花费很长时间来执行的方法,并且不希望让程序等待方法的返回时,就应该使用异步编程,在很多情况下采用异步途径往往更有效率。

概述线程状态

  • 新建 new。

  • 就绪 放在可运行线程池中,等待被线程调度选中,获取 cpu。

  • 运行 获得了 cpu。

  • 阻塞

    • 等待阻塞 执行 wait() 。
    • 同步阻塞 获取对象的同步琐时,同步锁被别的线程占用。
    • 其他阻塞 执行了 sleep() 或 join() 方法)。
  • 死亡

进程同步和进程通信

进程同步与进程通信很容易混淆,它们的区别在于:

  • 进程同步:控制多个进程按一定顺序执行;
  • 进程通信:进程间传输信息。

进程通信是一种手段,而进程同步是一种目的。也可以说,为了能够达到进程同步的目的,需要让进程进行通信,传输一些进程同步所需要的信息。

wait 和 sleep

1、 sleep 来自 Thread 类,和 wait 来自 Object 类。

2、最主要是sleep方法没有释放锁,而 wait 方法释放了锁,使得其他线程可以使用同步控制块或者方法。

3、wait,notify 和 notifyAll 只能在同步控制方法或者同步控制块里面使用,而 sleep 可以在任何地方使用。

4、 sleep 必须捕获异常,而 wait , notify 和 notifyAll 不需要捕获异常

  • sleep 方法属于 Thread 类中方法,表示让一个线程进入睡眠状态,等待一定的时间之后,自动醒来进入到可运行状态,不会马上进入运行状态,因为线程调度机制恢复线程的运行也需要时间,一个线程对象调用了 sleep 方法之后,并不会释放他所持有的所有对象锁,所以也就不会影响其他进程对象的运行。但在 sleep 的过程中过程中有可能被其他对象调用它的 interrupt() ,产生 InterruptedException 异常,如果你的程序不捕获这个异常,线程就会异常终止,进入 TERMINATED 状态,如果你的程序捕获了这个异常,那么程序就会继续执行catch语句块以及以后的代码。

sleep() 方法是一个静态方法,也就是说他只对当前对象有效,通过t.sleep()让 t 对象进入 sleep ,这样的做法是错误的,它只会是使当前线程被 sleep 而不是 t 线程

  • wait 属于 Object 的成员方法,一旦一个对象调用了 wait 方法,必须要采用 notify() 和 notifyAll() 方法唤醒该进程;如果线程拥有某个或某些对象的同步锁,那么在调用了 wait() 后,这个线程就会释放它持有的所有同步资源。wait() 方法也同样会在 wait 的过程中有可能被其他对象调用 interrupt() 方法而终止。

wait方法和notify方法,并不是Thread线程上的方法,它们是Object上的方法。

因为所有的Object都可以被用来作为同步对象,所以准确的讲,wait和notify是同步对象上的方法。

wait()的意思是: 让占用了这个同步对象的线程,临时释放当前的占用,并且等待。 所以调用wait是有前提条件的,一定是在synchronized块里,否则就会出错。

notify() 的意思是,通知一个等待在这个同步对象上的线程,可以苏醒过来了,有机会重新占用当前对象了。

notifyAll() 的意思是,通知所有的等待在这个同步对象上的线程,你们可以苏醒过来了,有机会重新占用当前对象了。

volatile 与 synchronized 的区别

(1)仅靠volatile不能保证线程的安全性。(原子性)

  • volatile 轻量级,只能修饰变量。synchronized重量级,还可修饰方法
  • volatile 只能保证数据的可见性,不能用来同步,因为多个线程并发访问 volatile 修饰的变量不会阻塞。

synchronized 不仅保证可见性,而且还保证原子性,因为,只有获得了锁的线程才能进入临界区,从而保证临界区中的所有语句都全部执行。多个线程争抢 synchronized 锁对象时,会出现阻塞。

(2)线程安全性

线程安全性包括两个方面,①可见性。②原子性。

从上面自增的例子中可以看出:仅仅使用 volatile 并不能保证线程安全性。而 synchronized 则可实现线程的安全性。

线程池

Java 通过 Executors 提供四种线程池,分别为:

  • new CachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • new FixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • new ScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • new SingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

线程安全

当多个线程访问同一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替运行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获取正确的结果,那这个对象是线程安全的。

如何保证线程安全?

  • 对变量使用 volitate
  • 对程序段进行加锁 (synchronized , lock)

阻塞同步(互斥同步)

synchronized 和 ReentrantLock。

互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

非阻塞同步

随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要把线程挂起,因此这种同步操作称为非阻塞同步。

乐观锁需要操作和冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。

硬件支持的原子性操作最典型的是:比较并交换(Compare-and-Swap,CAS)。CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。

J.U.C 包里面的整数原子类 AtomicInteger,其中的 compareAndSet() 和 getAndIncrement() 等方法都使用了 Unsafe 类的 CAS 操作。

同步和异步,阻塞和非阻塞

同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)

同步

  • 在发出一个同步调用时,在没有得到结果之前,该调用就不返回。
  • 例如:按下电饭锅的煮饭按钮,然后等待饭煮好,把饭盛出来,然后再去炒菜。

异步

  • 在发出一个异步调用后,调用者不会立刻得到结果,该调用就返回了。
  • 例如:按下电钮锅的煮饭按钮,直接去炒菜或者做别的事情,当电饭锅“滴滴滴”响的时候,再回去把饭盛出来。显然,异步式编程要比同步式编程高效得多。

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

阻塞

  • 调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
  • 例子:你打电话问书店老板有没有《分布式系统》这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果

非阻塞

  • 在不能立刻得到结果之前,该调用不会阻塞当前线程。
  • 例子:你打电话问书店老板有没有《分布式系统》这本书,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。

start() 方法调用时会执行 run() 方法,为什么不直接调用 run() 方法?

new 一个 Thread,线程进入了新建状态;直接执行 run() 方法,会把 run 方法当成一个 main 线程下的普通方法去执行,并不会在某个线程中执行它,这并不是多线程工作。调用 start() 方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行了。 start() 会执行线程的相应准备工作,然后自动执行 run() 方法的内容,这是真正的多线程工作。

run()和start()方法区别:

  • run():仅仅是封装被线程执行的代码,直接调用是普通方法
  • start():首先启动了线程,然后再由jvm去调用该线程的run()方法。

总结: 调用 start 方法方可启动线程并使线程进入就绪状态,而 run 方法只是 thread 的一个普通方法调用,还是在主线程里执行