1 锁编程

java对象包含了三个部分:对象头,实例数据和对齐填充。

对象头又存放了:markWord和class point。

classpoint :指向方法区,当前对象的类信息数据。

markword:存储了很多和当前对象运行时的数据:例如hashcode,锁状态标志,指向锁记录的指针。

锁标志位:主要用来区分锁的等级,无锁->偏向锁->轻量级锁->重量级锁;

synchronized的实现原理?

synchronized被编译后会成才monitorenter和monitorexit两个字节码指令,分别表示加锁和释放锁。

monitorenter和monitorexit都是基于Monitor实现的。所谓的Monitor其实是一种同步工具,也可以说是一种同步机制。在Java虚拟机(HotSpot)中,Monitor是由ObjectMonitor实现的,可以叫做内部锁,或者Monitor锁。

ObjectMonitor的信息如下:

ObjectMonitor() {

_header = NULL;

_count = 0; // 记录线程获取锁的次数

_waiters = 0,

_recursions = 0; //锁的重入次数

_object = NULL;

_owner = NULL; // 指向持有ObjectMonitor对象的线程

_WaitSet = NULL; // 处于wait状态的线程,会被加入到_WaitSet

_WaitSetLock = 0 ;

_Responsible = NULL ;

_succ = NULL ;

_cxq = NULL ;

FreeNext = NULL ;

_EntryList = NULL ; // 处于等待锁block状态的线程,会被加入到该列表

_SpinFreq = 0 ;

_SpinClock = 0 ;

OwnerIsThread = 0 ;

}

synchronized的重量级锁讲解:

通过上文已知,monitor由Entry Set 和 Wait Set 两个等待区。

Entry Set:存放已经准备抢锁的线程。

Wait Set:存放执行wait等指令的线程。

当某一个线程抢到锁,那么Owner就会指向改线程。改线程可以调用notify方法,随机唤醒一个线程进入Entry Set 区准备抢锁。

java1.6之后,synchronized有4种状态,无锁,偏向锁,轻量级锁和重量级锁。

无锁:不对资源放入synchronized代码块中。

偏向锁:对资源进行加锁,但是实际运行中只有一个线程获得这个锁。此时锁标志位还是01跟无锁是一样的。仅仅修改是否偏向锁的标志位,从0改到1.

当存在多个线程来获取这个锁时。偏向锁会升级为轻量级锁。但是又一个问题来了?当锁升级为轻量级时如何判断线程和锁之间的绑定关系呢?

轻量级锁和重量级锁都将前30bit修改为 指向栈中记录锁的指针 。此时这个指针会指向虚拟机栈中开辟的lockRecord,lockRecord存放的是MarkWord的副本和owner指针。

线程通过CAS尝试获取锁,一旦获取成功,将复制该对象的markword到自己的lockRecord,并修改owner指针,指向该对象。从而实现了线程和锁之间的绑定。

轻量级锁是多个线程在不同时间访问共享资源。

如果多个线程在同一时刻抢夺锁时,便会升级为重量级锁,就需要用到monitor机制,即如上文所述完全由jvm控制。

2 无锁编程

无锁编程采用CAS(compare and swap)机制,也就是乐观锁,其关键在于cas是原子性操作。

乐观锁适合读多写少的情况,以下代码采用cas实现多线程累加。

public class ThreadAtomicInte {

static AtomicInteger integer = new AtomicInteger(0);

public static void main(String[] args) {

for (int i = 0; i < 3; i++) {

new Thread(()->{

while (integer.get() <= 1000) {

System.out.println("thread name" + Thread.currentThread().getName() + ": " + integer.getAndIncrement());

}

}).start();

}

}

3 AQS

AbstractQueuedSynchronizer 抽象队列同步器,简称 AQS

AQS是基于一个FIFO的双向链表队列,其内部定义了一个节点类Node,Node 节点内部的 SHARED 用来标记该线程是获取共享资源时被阻挂起后放入AQS 队列的, EXCLUSIVE 用来标记线程是 取独占资源时被挂起后放入AQS 队列。

AQS 使用一个 volatile 修饰的 int 类型的成员变量 state 来表示同步状态,修改同步状态成功即为获得锁,volatile 保证了变量在多线程之间的可见性,修改 State 值时通过 CAS 机制来保证修改的原子性。

获取state的方式分为两种,独占方式和共享方式,一个线程使用独占方式获取了资源,其它线程就会在获取失败后被阻塞。一个线程使用共享方式获取了资源,另外一个线程还可以通过CAS的方式进行获取。

如果共享资源被占用,需要一定的阻塞等待唤醒机制来保证锁的分配,AQS 中会将竞争共享资源失败的线程添加到一个变体的队列中。

static final class Node {

/** 共享节点 */

static final Node SHARED = new Node();

/** 独占节点 */

static final Node EXCLUSIVE = null;

/** 超时或者中断,该节点会被设置为取消,被取消的节点不会参与竞争 */

static final int CANCELLED = 1;

/** 标志这个这节点的后置节点处于等待状态,如果这个节点是否同步状态,或者被取消会通知后置节点 */

static final int SIGNAL = -1;

/** */

static final int CONDITION = -2;

/**

*

*

*/

static final int PROPAGATE = -3;

volatile int waitStatus;

        // 前序节点

        volatile Node prev;

// 后继节点

volatile Node next;

/**

*

* 获取同步状态的线程

*/

volatile Thread thread;

FIFO的结构图:

head指向第一个节点。

tail指向最后一个节点。

state:大于0表示持有锁,等于0表示未持有锁。

3.1独占式同步状态过程

通过调用acquire方法,acquire包含tryAcquire、acquireQueued、addWaiter和selfInterrupt方法。

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

tryAcquire

尝试获取锁,获取成功返回true,否则返回false。该方法由继承AQS的子类进行实现。如ReentrantLock的内部类Sync的子类。NonfairSync 和FairSync

protected boolean tryAcquire(int arg) {

throw new UnsupportedOperationException();

}

NonfairSync的实现为

protected final boolean tryAcquire(int acquires) {

return nonfairTryAcquire(acquires);

}

final boolean nonfairTryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

addWaiter

private Node addWaiter(Node mode) {

        // 1. 新建Node节点

Node node = new Node(Thread.currentThread(), mode);

// 2 快速加入队列尾部

Node pred = tail;

if (pred != null) {

node.prev = pred;

            // 采用cas方式 设置尾部节点

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

        // 如果上面失败,采取自旋方式设置尾节点。

enq(node);

        // 返回新插入的尾部节点。

return node;

}

private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) { // Must initialize

                // 创建一个空节点并设置为投节点。

if (compareAndSetHead(new Node()))

tail = head;

} else {

                // 将当前节点插入到尾节点。

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

                    // 返回尾节点.

return t;

}

}

}

}

acquireQueued

final boolean acquireQueued(final Node node, int arg) {

        // 操作是否失败的标记

boolean failed = true;

try {

boolean interrupted = false;

for (;;) {

                //获取node节点的prev节点。

final Node p = node.predecessor();

                // 如果p是头结点。

if (p == head && tryAcquire(arg)) {

                    // 将当前节点设置为头节点。

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

                // 自旋过程中,判断当前线程是否需要阻塞 && 阻塞当前线程并且检验线程的中断状态

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

// shouldParkAfterFailedAcquire方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        // 1. 获取当前节点的prev节点的等待状态

int ws = pred.waitStatus;

   

        // 如果状态为signal表示需要去唤醒他的后继节点

if (ws == Node.SIGNAL)

/*

* 如果pre节点是signal,表示当前pre释放了同步状态或者取消了

* 会去唤醒当前节点,当前节点可以被阻塞或者睡觉

*/

return true;

if (ws > 0) {

/*

* 说明pre节点是被取消了,从队列中移除。

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

/*

* 除了以上情况,通过cas将pre状态设置为signal.

*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

// parkAndCheckInterrupt方法

private final boolean parkAndCheckInterrupt() {

        // 阻塞当前线程。

LockSupport.park(this);

        // 返回当前线程的中断状态。

return Thread.interrupted();

}

selfInterrupt

static void selfInterrupt() {

        // 未获取到中断状态 && 线程中断状态为true, 中断当前线程

Thread.currentThread().interrupt();

}

4 ReentrantLock

ReentrantLock实现了Lock接口 内部定义抽象同步器Sync,其继承了AQS,并且内部有公平锁和非公平锁的实现类。

我们经常调用ReentrantLock无参构造器,那么我们以非公平锁NonfairSync为例进行讲解。

public ReentrantLock() {

        // 定义 同步器的实现类为非公平

sync = new NonfairSync();

}

NonfairSync的加锁方法:

final void lock() {

     // 修改aqs中的state属性 从0-->1      

if (compareAndSetState(0, 1))

        // 设置当前线程占有这把锁。

setExclusiveOwnerThread(Thread.currentThread());

else

        // 没修改成功则进入AQS的获取锁的方法。

acquire(1);

}

假设现在没有竞争,我们进入if判断:设置state = 1 并且exclusiveOwnerThread属性设置为自己。

第一个竞争出现时,还会执行lock方法,但是会进入else里面。

调用acquire方法,这个方法在aqs中定义的。

Thread-1 执行了

1.CAS 尝试将 state 由0改为 1,结果失败

2.进入tryAcquire 逻辑,这时state 已经是1,结果仍然失败

3.接下来进入addWaiter 逻辑,构造Node 队列

图中黄色三角表示该Node的waitStatus 状态,其中0为默认正常状态

Node 的创建是懒惰的

其中第一个Node称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

当前线程进入acquireQueued逻辑

1. acquireQueued会在一个死循环中不断尝试获得锁,失败后进入park 阻塞

2. 如果自己是紧邻着head(排第二位),那么再次tyAcquire 尝试获取锁,当然这时state仍为1,失败

3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱node,即 head 的 waitStatus改为-1,这次返回 false。 改为-1表示,当前节点有责任唤醒它的后继节点 。

4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued,再次 tyAcquire尝试获取锁,当然这时state 仍为 1,失败

5. 当再次进入shouldParkAfterFailedAcquire 时,这时因为其前驱 node的5.waitStatus 已经是-1,这次返回true

6. 进入parkAndCheckInterrupt, Thread-1 park (灰色表示)

当有多个线程经历上述竞争失败。

Thread-0 释放锁,进入tryRelease 流程,如果成功

设置exclusiveOwnerThread为 null

state = 0

public final boolean release(int arg) {

    // 首先调用tryRelsease

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

            // 然后唤醒下一个节点。

unparkSuccessor(h);

return true;

}

return false;

}

//tryRelsease 逻辑

protected final boolean tryRelease(int releases) {

int c = getState() - releases;

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false;

if (c == 0) {

free = true;

        // owner 设置为null

setExclusiveOwnerThread(null);

}

    // state - 1;

setState(c);

return free;

}

private void unparkSuccessor(Node node) {

    // 获取头结点的状态为 -1

int ws = node.waitStatus;

if (ws < 0)

        // 设置状态为0

compareAndSetWaitStatus(node, ws, 0);

    // 获取后继节点

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

        // 调用unpark,唤醒后继节点

LockSupport.unpark(s.thread);

}

当Thread-0释放锁之后,会唤醒Thread-1来尝试获取锁。那这不就是公平了吗?顺序获取锁,是的在队列中的线程是公平的。但是在Thread-0释放锁的同时敲好有其他线程来竞争锁,那么Thread-1可能就获取不到锁,这就是非公平的体现!!

可重入的体现

final boolean nonfairTryAcquire(int acquires) {

  //acquires = 1

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

    // 已经获取锁 判断当前线程是否是owner,开始锁重入

else if (current == getExclusiveOwnerThread()) {

        // state ++;

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

// 释放锁

protected final boolean tryRelease(int releases) {

int c = getState() - releases;

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false;

    // 释放锁要判断 state 是否 = 0;

if (c == 0) {

free = true;

setExclusiveOwnerThread(null);

}

setState(c);

    // 返回false 表示没有释放成功,true表示释放成功

return free;

}

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: