Java多线程-锁机制

锁与同步

  1. 同步synchronized用来修饰方法,lock是一个实例变量,通过调用lock()方法来取得锁
  2. 同步和锁都是针对方法,不是同步变量和类
  3. 同步无法保证线程取得方法执行的先后顺序,锁可以设置公平锁来确保
  4. 不必同步类中的所有方法,类可以同时拥有同步和非同步方法
  5. 如果线程拥有同步和非同步方法,则非同步方法和同步方法被多个线程自由访问而不受锁的限制
  6. 线程睡眠时,持有的锁不会被释放
  7. 线程可以获得多个锁,比如在一个对象的同步方法里面调用另一个对象的同步方法,则获取了两个对象的锁。
  8. 同步损害并发性,应该尽可能缩小同步的范围,同步不但可以同步整个方法,还可以同步方法中的一部分代码
  9. 在使用同步代码块的时候,应该指定在哪个对象上同步,也就是要获得哪个对象的锁

锁的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.locks.ReentrantLock;
public class ReenterLock implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run() {
for (int j = 0; j < 10000000; j++) {
lock.lock();
try {
i++;
}finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock reenterLock = new ReenterLock();
Thread thread = new Thread(reenterLock);
Thread thread1 = new Thread(reenterLock);
thread.start();
thread1.start();
thread.join();
thread1.join();
System.out.println(i);
}
}

Lock接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Lock {
// 获得锁,lock会忽视interrupt,拿不到锁会一直阻塞
void lock();
// 获取锁,相应interrupt()并抛出异常,跳出阻塞
void lockInterruptibly() throws InterruptedException;
// 尝试获取锁,成功返回true
boolean tryLock();
// 在指定时间内尝试获取锁,成功则返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 条件状态,阻塞队列中使用
Condition newCondition();
}

ReentrantLock重入锁

成员变量

1
2
3
4
5
6
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronize提供所有同步机制的实现 */
private final Sync sync;
...
}

只有一个成员变量sync,其抽象类如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 获取锁
abstract void lock();
// 尝试获取非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

构造函数

1
2
3
4
5
6
7
8
9
// 默认创建非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 创建公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

lock方法

1
2
3
4
// sync的lock
public void lock() {
sync.lock();
}

lockInterruptibly方法

1
2
3
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

非公平锁的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 非公平锁的实现是继承Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 获取锁的方法
final void lock() {
if (compareAndSetState(0, 1))
// 设置独占模式,也就是一个锁只能被一个线程独占
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 没有获取锁,尝试使用信号量的方式
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// compareAndSwapInt这个方法不是Java实现的,而是通过JNI来调用操作系统的原生程序
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

tryLock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取状态
int c = getState();
if (c == 0) {
// 如果没有获取到锁,CAS设置状态
if (compareAndSetState(0, acquires)) {
// 独占线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 当前线程持有这个锁,设置重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 未获取到锁,退出
return false;
}

其中getState获取state的值,0表示未获取到锁,1表示已经获取到锁,大于1表示重入数

tryLock(long timeout, TimeUnit unit)方法

1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

该方法实际上调用的是Sync父类AbstractQueuedSynchronizer的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
// 循环等待获取锁的方法
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 放入等待节点,组成一个链表
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 当前节点为头结点,尝试获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
// 超时退出
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 未超时
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
// 检测到中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

ReadWrite读写锁

  • 读写锁的使用场景
    • 多线程同时读取临界值,单一线程写
    • 阻塞同时读写
非阻塞 阻塞
阻塞 阻塞

实例

  • 提交18个读任务,2个写任务,使用重入锁耗时奖金20毫秒,使用读写锁耗时3秒左右
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 读写锁,允许多个线程同时读取
*/
public class ReadWriteLockDemo {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private int value;
private static CountDownLatch cd = new CountDownLatch(20);
public Object handleRead(Lock lock) throws InterruptedException {
try {
//模拟读操作
lock.lock();
Thread.sleep(1000);
return value;
} finally {
lock.unlock();
}
}
public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
value=index;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable readRunnable = () -> {
try {
//读写锁
// demo.handleRead(readLock);
demo.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
cd.countDown();
};
Runnable writeRunnable = () -> {
try {
demo.handleWrite(writeLock, new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
cd.countDown();
};
long start = System.currentTimeMillis();
for (int i = 0; i < 18; i++) {
Thread thread = new Thread(readRunnable);
thread.start();
}
for (int i = 18; i < 20; i++) {
Thread thread = new Thread(writeRunnable);
thread.start();
}
cd.await();
System.out.println("the program cost : " + (System.currentTimeMillis() - start) + " ms");
}
}
/*
* 读写锁:the program cost : 3020 ms
* 重入锁:the program cost : 18156 ms
*/

源码解读

ReadWriteLock接口

1
2
3
4
5
6
7
public interface ReadWriteLock {
// 读锁
Lock readLock();
// 写锁
Lock writeLock();
}

ReentrantReadWriteLock类

成员变量

1
2
3
4
5
6
7
8
9
10
11
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
// 内部类提供的读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 内部类提供的写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 实现同步操作的机制
final Sync sync;
//...
}

构造函数

1
2
3
4
5
6
7
8
9
10
11
// 非公平锁构造方法
public ReentrantReadWriteLock() {
this(false);
}
// 公平锁构方法
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

ReadLock内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
// Sync,和重入锁一样,同步机制由Sync来实现
private final Sync sync;
// 构造方法,传入sync的实例
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取共享锁
public void lock() {
sync.acquireShared(1);
}
// 相应中断,跳出阻塞
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 尝试获取锁
public boolean tryLock() {
return sync.tryReadLock();
}
// 在指定时间内尝试获取锁
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 释放锁
public void unlock() {
sync.releaseShared(1);
}
// 条件变量
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}

WriteLock内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
// sync同步同步机制的实现
private final Sync sync;
// 构造方法
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取锁
public void lock() {
sync.acquire(1);
}
// 响应中断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试获取锁,成功返回true
public boolean tryLock( ) {
return sync.tryWriteLock();
}
// 在指定时间内尝试获取锁
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 释放锁
public void unlock() {
sync.release(1);
}
// 条件变量
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
// 是否被当前线程所占据
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
// 返回写锁被当前线程持有的次数
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

CountDownLatch

  • 倒计数器,控制线程等待,直到倒计数器为0,线程继续开始执行。

实例

  • 火箭发射中需要进行各个仪器的检查,只有所有的都正常,才可以发射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 倒计时器,让某一个线程等待倒计时器结束再开始执行
*/
public class CountDownLatchDemo implements Runnable{
private static final CountDownLatch end = new CountDownLatch(10);
private static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run(){
try {
Thread.sleep(new Random().nextInt(10) * 100);
System.out.println("check complete");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.submit(demo);
}
end.await();
System.out.println("Fire");
executorService.shutdown();
}
}
/*
check complete
check complete
check complete
check complete
check complete
check complete
check complete
check complete
check complete
check complete
Fire
*/
  • 赛跑或者赛车问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 赛跑问题countDownLatch解决方法
**/
public class Player implements Runnable{
private int id;
private CountDownLatch begin;
private CountDownLatch end;
public Player(int id, CountDownLatch begin, CountDownLatch end) {
this.id = id;
this.begin = begin;
this.end = end;
}
@Override
public void run() {
try {
begin.await();
long startTime = System.currentTimeMillis();
System.out.printf("Player num %d start to run at %d!\n", id, startTime);
Thread.sleep(new Random().nextInt(100)); //随机分配时间,即运动员完成时间
System.out.printf("Player num %d finish game, time : %d!\n", id, System.currentTimeMillis() - startTime);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
public static void main(String[] args) {
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(5);
Player[] players = new Player[5];
for (int i = 0; i < 5; i++) {
players[i] = new Player(i+1, begin, end);
}
ExecutorService es = Executors.newFixedThreadPool(5);
for (Player player : players) {
es.execute(player);
}
System.out.println("begin race");
begin.countDown();
try {
end.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Race ends!");//宣布比赛结束
}
es.shutdown();
}
}
/*
begin race
Player num 1 start to run at 1536030032314!
Player num 5 start to run at 1536030032315!
Player num 4 start to run at 1536030032315!
Player num 3 start to run at 1536030032315!
Player num 2 start to run at 1536030032315!
Player num 4 finish game, time : 31!
Player num 1 finish game, time : 43!
Player num 2 finish game, time : 75!
Player num 5 finish game, time : 103!
Player num 3 finish game, time : 109!
Race ends!
*/

CyclicBarrier

  • 可以实现线程之间的等待,可以循环使用,如十个一组执行的场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* CycleBarrier,循环栅栏,可以反复使用的计数器
*/
public class CyclicBarrierDemo {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclicBarrier;
public Soldier(String soldier, CyclicBarrier cyclicBarrier) {
this.soldier = soldier;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
//等待所有人到齐
cyclicBarrier.await();
doWork();
//等待所有完成工作
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
private void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + ":任务完成");
}
}
public static class BarrierRun implements Runnable {
boolean flag;
int N;
public BarrierRun(boolean flag, int n) {
this.flag = flag;
N = n;
}
@Override
public void run() {
if (flag) {
System.out.println("司令:【士兵" + N + "个,任务完成!】");
} else {
System.out.println("司令:【士兵" + N + "个,集结完毕!】");
flag = true;
}
}
}
public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
//设置屏障点
System.out.println("集合队伍!");
for (int i = 0; i < N; i++) {
System.out.println("士兵" + i + "报道!");
allSoldier[i] = new Thread(new Soldier("士兵" + i + "报道!", cyclic));
allSoldier[i].start();
}
}
}
/*
集合队伍!
士兵0报道!
士兵1报道!
士兵2报道!
士兵3报道!
士兵4报道!
士兵5报道!
士兵6报道!
士兵7报道!
士兵8报道!
士兵9报道!
司令:【士兵10个,集结完毕!】
士兵9报道!:任务完成
士兵0报道!:任务完成
士兵8报道!:任务完成
士兵3报道!:任务完成
士兵7报道!:任务完成
士兵2报道!:任务完成
士兵1报道!:任务完成
士兵5报道!:任务完成
士兵4报道!:任务完成
士兵6报道!:任务完成
司令:【士兵10个,任务完成!】
*/

LockSupport

  • LockSupport类似于suspend(),可以让一个线程在任意位置阻塞,不用获取锁,也不会抛出异常
  • park方法阻塞当前线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.locks.LockSupport;
public class LockSupportDemo {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");
public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}
@Override
public void run() {
synchronized (u) {
System.out.println("in " + getName());
LockSupport.park();
}
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}
Donate comment here