日期:2014-05-16  浏览次数:20473 次

LinkedBlockingQueue + 单向链表基本结构

LinkedBlockingQueue是一个单向链表结构的队列,也就是只有next,没有prev。如果不指定容量默认为Integer.MAX_VALUE。通过putLock和takeLock两个锁进行同步,两个锁分别实例化notFull和notEmpty两个Condtion,用来协调多线程的存取动作。其中某些方法(如remove,toArray,toString,clear等)的同步需要同时获得这两个锁,此时存或者取操作都会不可进行,需要注意的是所有需要同时lock的地方顺序都是先putLock.lock再takeLock.lock,这样就避免了可能出现的死锁问题。

takeLock实例化出一个notEmpty的Condition,putLock实例化一个notFull的Condition,两个Condition协调即时通知线程队列满与不满的状态信息。

?

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 节点数据结构
     */
    static class Node<E> {
        /** The item, volatile to ensure barrier separating write and read */
        volatile E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    /** 队列的容量 */
    private final int capacity;

    /** 持有节点计数器 */
    private final AtomicInteger count = new AtomicInteger(0);

    /** 头指针 */
    private transient Node<E> head;

    /** 尾指针 */
    private transient Node<E> last;

    /** 用于读取的独占锁*/
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 队列是否为空的条件 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 用于写入的独占锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 队列是否已满的条件 */
    private final Condition notFull = putLock.newCondition();

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
       //插入数据到链尾
    private void insert(E x) {
        last = last.next = new Node<E>(x);
    }
       //从链头取数据
    private E extract() {
        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    private void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    private void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);  //注意初始化时head/last地址相同,数据都为null
					// 其实就相当于只有一个节点
    }

   ...
}

?这里仅仅展示部分源码,主要的方法在后面的分析中列出。分析之前明确一个最基本的概念。天天念叨着编写线程安全的类,什么是线程安全的类?那就是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(改变,遍历,查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类的编写不是线程安全的。?


??? 明确了这个基本的概念就可以很好的理解这个Queue的实现为什么是线程安全的了。在LinkedBlockingQueue的所有共享的全局变量中,final声明的capacity在构造器生成实例时就成了不变量了。而final声明的count由于是AtomicInteger类型的,所以能够保证其操作的原子性。剩下的final的变量都是初始化成了不变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和last两个可变量。所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的。?