歡迎您光臨本站 註冊首頁

Java多線程(五)之BlockingQueue深入分析

←手機掃碼閱讀     火星人 @ 2014-03-09 , reply:0

  一、概述:

  BlockingQueue作為線程容器,可以為線程同步提供有力的保障.

  二、BlockingQueue定義的常用方法

  1.BlockingQueue定義的常用方法如下:

  拋出異常 特殊值 阻塞 超時

  插入 add(e) offer(e) put(e) offer(e, time, unit)

  移除 remove() poll() take() poll(time, unit)

  檢查 element() peek() 不可用 不可用

  1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則招聘異常

  2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.

  3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裡面有空間再繼續.

  4)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null

  5)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止

  其中:BlockingQueue 不接受null 元素.試圖add、put 或offer 一個null 元素時,某些實現會拋出NullPointerException.null 被用作指示poll 操作失敗的警戒值.

  三、BlockingQueue的幾個注意點

  【1】BlockingQueue 可以是限定容量的.它在任意給定時間都可以有一個remainingCapacity,超出此容量,便無法無阻塞地put 附加元素.沒有任何內部容量約束的BlockingQueue 總是報告Integer.MAX_VALUE 的剩餘容量.

  【2】BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持Collection 介面.因此,舉例來說,使用remove(x) 從隊列中移除任意一個元素是有可能的.然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊信息時.

  【3】BlockingQueue 實現是線程安全的.所有排隊方法都可以使用內部鎖或其他形式的併發控制來自動達到它們的目的.然而,大量的 Collection 操作(addAll、containsAll、retainAll 和removeAll)沒有 必要自動執行,除非在實現中特別說明.因此,舉例來說,在只添加了c 中的一些元素后,addAll(c) 有可能失敗(拋出一個異常).

  【4】BlockingQueue 實質上不 支持使用任何一種"close"或"shutdown"操作來指示不再添加任何項.這種功能的需求和使用有依賴於實現的傾向.例如,一種常用的策略是:對於生產者,插入特殊的end-of-stream 或poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋.

  四、簡要概述BlockingQueue常用的四個實現類



  1)ArrayBlockingQueue:規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的.

  

  2)LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的

  3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序.

  4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.

  其中LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue.

  五、具體BlockingQueue的實現類的內部細節

  有耐心的同學請看具體實現類細節:

  1、ArrayBlockingQueue

  ArrayBlockingQueue是一個由數組支持的有界阻塞隊列.此隊列按 FIFO(先進先出)原則對元素進行排序.隊列的頭部 是在隊列中存在時間最長的元素.隊列的尾部 是在隊列中存在時間最短的元素.新元素插入到隊列的尾部,隊列檢索操作則是從隊列頭部開始獲得元素.

  這是一個典型的"有界緩存區",固定大小的數組在其中保持生產者插入的元素和使用者提取的元素.一旦創建了這樣的緩存區,就不能再增加其容量.試圖向已滿隊列中放入元素會導致放入操作受阻塞;試圖從空隊列中檢索元素將導致類似阻塞.

  ArrayBlockingQueue創建的時候需要指定容量capacity(可以存儲的最大的元素個數,它不會自動擴容)以及是否為公平鎖(fair參數).

  在創建ArrayBlockingQueue的時候默認創建的是非公平鎖,不過我們可以在它的構造函數里指定.這裡調用ReentrantLock的構造函數創建鎖的時候,調用了:

  public ReentrantLock(boolean fair) {

  sync = (fair)? new FairSync() : new NonfairSync();

  }

  FairSync/ NonfairSync是ReentrantLock的內部類:

  線程按順序請求獲得公平鎖,而一個非公平鎖可以闖入,且當它尚未進入等待隊列,就會和等待隊列head結點的線程發生競爭,如果鎖的狀態可用,請求非公平鎖的線程可在等待隊列中向前跳躍,獲得該鎖.內部鎖synchronized沒有提供確定的公平性保證.

  分三點來講這個類:

  2.1 添加新元素的方法:add/put/offer

  2.2 該類的幾個實例變數:takeIndex/putIndex/count/

  2.3 Condition實現

  1.1 添加新元素的方法:add/put/offer

  ,談到添加元素的方法,得分析以下該類同步機制中用到的鎖:

  Java代碼

  [java]

  lock = new ReentrantLock(fair);

  notEmpty = lock.newCondition();//Condition Variable 1

  notFull = lock.newCondition();//Condition Variable 2

  這三個都是該類的實例變數,只有一個鎖lock,然後lock實例化出兩個Condition,notEmpty/noFull分別用來協調多線程的讀寫操作.

  Java代碼

  [java]

  public boolean offer(E e) {

  if (e == null) throw new NullPointerException();

  final ReentrantLock lock = this.lock;//每個對象對應一個顯示的鎖

  lock.lock();//請求鎖直到獲得鎖(不可以被interrupte)

  try {

  if (count == items.length)//如果隊列已經滿了

  return false;

  else {

  insert(e);

  return true;

  }

  } finally {

  lock.unlock();//

  }

  }

  看insert方法:

  private void insert(E x) {

  items[putIndex] = x;

  //增加全局index的值.

  /*

  Inc方法體內部:

  final int inc(int i) {

  return ( i == items.length)? 0 : i;

  }

  這裡可以看出ArrayBlockingQueue採用從前到後向內部數組插入的方式插入新元素的.如果插完了,putIndex可能重新變為0(在已經執行了移除操作的前提下,否則在之前的判斷中隊列為滿)

  */

  putIndex = inc(putIndex);

   count;

  notEmpty.signal();//wake up one waiting thread

  }

  Java代碼

  [java]

  public void put(E e) throws InterruptedException {

  if (e == null) throw new NullPointerException();

  final E[] items = this.items;

  final ReentrantLock lock = this.lock;

  lock.lockInterruptibly();//請求鎖直到得到鎖或者變為interrupted

  try {

  try {

  while (count == items.length)//如果滿了,當前線程進入noFull對應的等waiting狀態

  notFull.await();

  } catch (InterruptedException ie) {

  notFull.signal(); // propagate to non-interrupted thread

  throw ie;

  }

  insert(e);

  } finally {

  lock.unlock();

  }

  }

  Java代碼

  [java]

  public boolean offer(E e, long timeout, TimeUnit unit)

  throws InterruptedException {

  if (e == null) throw new NullPointerException();

  long nanos = unit.toNanos(timeout);

  final ReentrantLock lock = this.lock;

  lock.lockInterruptibly();

  try {

  for (;;) {

  if (count != items.length) {

  insert(e);

  return true;

  }

  if (nanos <= 0)

  return false;

  try {

  //如果沒有被 signal/interruptes,需要等待nanos時間才返回

  nanos = notFull.awaitNanos(nanos);

  } catch (InterruptedException ie) {

  notFull.signal(); // propagate to non-interrupted thread

  throw ie;

  }

  }

  } finally {

  lock.unlock();

  }

  }

  Java代碼

  [java]

  public boolean add(E e) {

  return super.add(e);

  }

  父類:

  public boolean add(E e) {

  if (offer(e))

  return true;

  else

  throw new IllegalStateException("Queue full");

  }

  1.2 該類的幾個實例變數:takeIndex/putIndex/count

  Java代碼

  [java]

  用三個數字來維護這個隊列中的數據變更:

  /** items index for next take, poll or remove */

  private int takeIndex;

  /** items index for next put, offer, or add. */

  private int putIndex;

  /** Number of items in the queue */

  private int count;

  提取元素的三個方法take/poll/remove內部都調用了這個方法:

  Java代碼

  [java]

  private E extract() {

  final E[] items = this.items;

  E x = items[takeIndex];

  items[takeIndex] = null;//移除已經被提取出的元素

  takeIndex = inc(takeIndex);//策略和添加元素時相同

  --count;

  notFull.signal();//提醒其他在notFull這個Condition上waiting的線程可以嘗試工作了

  return x;

  }

  從這個方法里可見,tabkeIndex維護一個可以提取/移除元素的索引位置,takeIndex是從0遞增的,這個類是FIFO隊列.

  putIndex維護一個可以插入的元素的位置索引.

  count顯然是維護隊列中已經存在的元素總數.

  1.3 Condition實現

  Condition現在的實現只有java.util.concurrent.locks.AbstractQueueSynchoronizer內部的ConditionObject,並且通過ReentranLock的newCondition()方法暴露出來,這是Condition的await()/sinal()一般在lock.lock()與lock.unlock()之間執行,當執行condition.await()方法時,它會釋放掉本線程持有的鎖,然後自己進入等待隊列.直到sinal(),喚醒后又會重新試圖去拿到鎖,拿到后執行await()下的代碼,其中釋放當前鎖和得到當前鎖都需要ReentranLock的tryAcquire(int arg)方法來判定,並且享受ReentranLock的重進入特性.

  Java代碼

  [java]

  public final void await() throws InterruptedException {

  if (Thread.interrupted())

  throw new InterruptedException();

  //加一個新的condition等待節點

  Node node = addConditionWaiter();

  //釋放自己的鎖

  int savedState = fullyRelease(node);

  int interruptMode = 0;

  while (!isOnSyncQueue(node)) {

  //如果當前線程 等待狀態時CONDITION,park住當前線程,等待condition的signal來解除

  LockSupport.park(this);

  if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

  break;

  }

  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

  interruptMode = REINTERRUPT;

  if (node.nextWaiter != null)

  unlinkCancelledWaiters();

  if (interruptMode != 0)

  reportInterruptAfterWait(interruptMode);

  }

  2、SynchronousQueue

  一種阻塞隊列,其中每個 put 必須等待一個 take,反之亦然.同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有.不能在同步隊列上進行 peek,僅在試圖要取得元素時,該元素才存在;除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)添加元素;也不能迭代隊列,其中沒有元素可用於迭代.隊列的頭 是嘗試添加到隊列中的首個已排隊線程元素;如果沒有已排隊線程,則不添加元素並且頭為 null.對於其他Collection 方法(例如 contains),SynchronousQueue 作為一個空集合.此隊列不允許 null 元素.

  同步隊列類似於 CSP 和 Ada 中使用的 rendezvous 通道.它非常適合於傳遞性設計,在這種設計中,在一個線程中運行的對象要將某些信息、事件或任務傳遞給在另一個線程中運行的對象,它就必須與該對象同步.

  對於正在等待的生產者和使用者線程而言,此類支持可選的公平排序策略.默認情況下不保證這種排序.但是,使用公平設置為 true 所構造的隊列可保證線程以 FIFO 的順序進行訪問.公平通常會降低吞吐量,但是可以減小可變性並避免得不到服務.

  3、LinkedBlockingQueue

  一個基於已鏈接節點的、範圍任意的 blocking queue.此隊列按 FIFO(先進先出)排序元素.隊列的頭部 是在隊列中時間最長的元素.隊列的尾部 是在隊列中時間最短的元素.新元素插入到隊列的尾部,並且隊列檢索操作會獲得位於隊列頭部的元素.鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數併發應用程序中,其可預知的性能要低.

  單向鏈表結構的隊列.如果不指定容量默認為Integer.MAX_VALUE.通過putLock和takeLock兩個鎖進行同步,兩個鎖分別實例化notFull和notEmpty兩個Condtion,用來協調多線程的存取動作.其中某些方法(如remove,toArray,toString,clear等)的同步需要同時獲得這兩個鎖,並且總是先putLock.lock緊接著takeLock.lock(在同一方法fullyLock中),這樣的順序是為了避免可能出現的死鎖情況(我也想不明白為什麼會是這樣?)

  4、PriorityBlockingQueue

  一個無界的阻塞隊列,它使用與類 PriorityQueue 相同的順序規則,並且提供了阻塞檢索的操作.雖然此隊列邏輯上是無界的,但是由於資源被耗盡,試圖執行添加操作可能會失敗(導致 OutOfMemoryError).此類不允許使用 null 元素.依賴自然順序的優先順序隊列也不允許插入不可比較的對象(這樣做會拋出ClassCastException).

  看它的三個屬性,就基本能看懂這個類了:

  Java代碼

  [java]

  private final PriorityQueue q;

  private final ReentrantLock lock = new ReentrantLock(true);

  private final Condition notEmpty = lock.newCondition();

  lock說明本類使用一個lock來同步讀寫等操作.

  notEmpty協調隊列是否有新元素提供,而隊列滿了以後會調用PriorityQueue的grow方法來擴容.

  5、DelayQueue

  Delayed 元素的一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素.該隊列的頭部 是延遲期滿后保存時間最長的 Delayed 元素.如果延遲都還沒有期滿,則隊列沒有頭部,並且 poll 將返回 null.當一個元素的getDelay(TimeUnit.NANOSECONDS) 方法返回一個小於或等於零的值時,則出現期滿.此隊列不允許使用 null 元素.

  Delayed介面繼承自Comparable,我們插入的E元素都要實現這個介面.

  DelayQueue的設計目的間API文檔:

  An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will returnnull. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.

  DelayQueue構造函數了里限定死不允許傳入comparator(之前的PriorityBlockingQueue中沒有限定死),即只能在compare方法里定義優先順序的比較規則.再看上面這段英文,"The head of the queue is that Delayed element whose delay expired furthest in the past."說明compare方法實現的時候要保證最先加入的元素最早結束延時.而 "Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero."說明getDelay方法的實現必須保證延時到了返回的值變為<=0的int.

  上面這段英文中,還說明了:在poll/take的時候,隊列中元素會判定這個elment有沒有達到超時時間,如果沒有達到,poll返回null,而take進入等待狀態.但是,除了這兩個方法,隊列中的元素會被當做正常的元素來對待.例如,size方法返回所有元素的數量,而不管它們有沒有達到超時時間.而協調的Condition available只對take和poll是有意義的.

  另外需要補充的是,在ScheduledThreadPoolExecutor中工作隊列類型是它的內部類DelayedWorkQueue,而DelayedWorkQueue的Task容器是DelayQueue類型,而ScheduledFutureTask作為Delay的實現類作為Runnable的封裝后的Task類.也就是說ScheduledThreadPoolExecutor是通過DelayQueue優先順序判定規則來執行任務的.

  6、BlockingDque LinkedBlockingQueue

  BlockingDque為阻塞雙端隊列介面,實現類有LinkedBlockingDque.雙端隊列特別之處是它首尾都可以操作.LinkedBlockingDque不同於LinkedBlockingQueue,它只用一個lock來維護讀寫操作,並由這個lock實例化出兩個Condition notEmpty及notFull,而LinkedBlockingQueue讀和寫分別維護一個lock.


[火星人 ] Java多線程(五)之BlockingQueue深入分析已經有846次圍觀

http://coctec.com/docs/java/show-post-59782.html