staticfinalclassNode{ /** Marker to indicate a node is waiting in shared mode */ staticfinal Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ staticfinal Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */ staticfinalint CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ staticfinalint SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ staticfinalint CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ staticfinalint PROPAGATE = -3; volatileint waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter;
/** * Returns true if node is waiting in shared mode. */ finalbooleanisShared(){ return nextWaiter == SHARED; } // 获取前置节点 final Node predecessor()throws NullPointerException { Node p = prev; if (p == null) thrownew NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ publicReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); }
acquire方法获取许可
下面我们就从锁的获取入手开始解读AQS:
1 2 3 4 5
publicfinalvoidacquire(int arg){ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
privatevoidunparkSuccessor(Node node){ int ws = node.waitStatus; if (ws < 0) //置零当前线程所在的结点状态,允许失败 compareAndSetWaitStatus(node, ws, 0); // 从第二个节点开始往后找waitStatus<=0的节点,然后执行unpark Node s = node.next; // 找到下一个需要唤醒的结点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
private Node addConditionWaiter(){ Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { // 先检查一遍有没有取消状态的节点,如果有的话,清除掉 unlinkCancelledWaiters(); t = lastWaiter; } // 将当前线程封装成Node添加到Condition队列中 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
signal方法
唤醒一个线程
1 2 3 4 5 6 7
publicfinalvoidsignal(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
doSignal方法
1 2 3 4 5 6 7 8 9
privatevoiddoSignal(Node first){ do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; // 将Condition队列中的节点状态设置成SIGNAL,并将节点添加到同步队列中 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
finalbooleantransferForSignal(Node node){ /* * If cannot change waitStatus, the node has been cancelled. */ // 将节点状态改成0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse;
/* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 把当前添加到同步队列中,并返回前一个节点 Node p = enq(node); int ws = p.waitStatus; // 设置前一个节点的状态为SIGNAL if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 唤醒当前节点的线程 LockSupport.unpark(node.thread); returntrue; }