Java 1.5版本最終提供了對編程中最基礎數據結構之一-Queue的內在支持。本文章將探究新添加到java.util包中的Queue接口,演示如何去使用這個新特性去使你的數據處理流式化。
by Kulvir Singh Bhogal (Translated by Victor Jan 2004-09-26 )
(英文原文見http://www.devx.com/Java/Article/21983/1954?pf=true )
在 計算機學科中,基礎數據結構之一 — Queue。你會想起Queue是一種數據結構,在它里邊的元素可以按照添加它們的相同順序被移除。在以前的Java版本中,這中FIFO(先進先出)數 據結構很不幸被忽略了。隨著Java1.5(也叫Tiger)的出現,對Queue支持第一次成為固有特性。
過去在沒有Queue的情況下如何管理?
在Java 1.5以前,通常的實現方式是使用java.util.List 集合來模仿Queue。Queue的概念通過把對象添加(稱為enqueuing的操作)到List的尾部(即Queue的后部)并通過從List的頭部 (即Queue的前部)提取對象而從List中移除(稱為dequeuing的操作)來模擬。下面代碼顯示了你以前可能做法。
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.Vector;
public class QueueEmulate
{
private LinkedList queueMembers;
public Object enqueue(Object element)
{
queueMembers.add (element);
return element;
}
public Object dequeue()
{
return queueMembers.removeFirst();
}
}
現在我們有了Queue
Java 1.5在java.util包中 新添加了Queue接口,下面代碼中使用到它(從 sample code 的SimpleQueueUsageExample.java 截取)。注意,在Java 1.5中,java.util.LinkedList被用來實現java.util.Queue,如同java.util.List接口。也要注意我是如 何顯式地聲明我的只包含字符串的Queue---使用<String> 泛型(如果需要深入了解泛型,請參閱"J2SE 1.5: Java's Evolution Continues ")。這樣使我省卻了當從 Queue 中提取它們時不得不進行對象造型的痛苦。
Queue<String> myQueue = new LinkedList<String>();
myQueue.add("Add Me");
myQueue.add("Add Me Too");
String enqueued = myQueue.remove();
你可以看到LinkedList類的add和remove方法被分別用于執行enqueuing和dequeuing操作。實際上沒有更好的可用方法;Queue接口提供了新的offer和poll方法,如下顯示(截取自SimpleQueueUsageExamplePreferred ):
Queue<String> myQueue = new LinkedList<String>();
boolean offerSuccess;
// offer method tries to enqueue.
// A boolean is returned telling you if the
// attempt to add to the queue was successful or not
offerSuccess=myQueue.offer("Add Me");
offerSuccess=myQueue.offer("Add Me Too");
// peek at the head of the queue, but don't grab it
Object pull = myQueue.peek();
String enqueued = null;
// grab the head of the queue
if (pull!=null) enqueued = myQueue.poll();
System.out.println(enqueued);
如果你的Queue有固定長度限制(常常是這種情形),使用add方法向Queue中添加內容,將導致拋出一個非檢查異常。當你編譯SimpleQueueUsageExample的代碼時,編譯器將會就此問題發出警告。相反,當新的offer方法試圖添加時,如果一切正常則會返回TRUE,否則,返回FALSE。 同樣地, 如果你試圖使用remove方法對一個空Queue操作時 ,也將導致一個非檢查異常。
你 也可以使用poll方法去從Queue中提取元素。如果在Queue中沒有元素存在,poll方法將會返回一個null(即不會拋出異常)。在某些情況 下,你可能不想提取頭部的元素而只是想看一下。Queue接口提供了一個peek方法來做這樣的事情。如果你正在處理一個空Queue,peek方法返回 null。象add-offer和remove-poll關系一樣,還有peek-element關系。在Queue為空的情形下,element方法拋 出一個非檢查異常。但如果在Queue中有元素,peek方法允許你查看一下第一個元素而無需真的把他從Queue中取出來。peek方法的用法在SimpleQueueUsageExamplePreferred類中有示例。
AbstractQueue類
如你所見,java.util.LinkedList類實現了java.util.Queue接口,同樣,AbstractQueue也是這樣。 AbstractQueue 類實現了java.util接口的一些方法(因此在它的名字中包含abstract)。而AbstractQueue將重點放在了實現offer,poll和peek方法上。另外使用一些已經提供的具體實現。
PriorityQueue類
在 PriorityQueue中,當你添加元素到Queue中時,實現了自動排序。根據你使用的PriorityQueue的不同構造器,Queue元素的 順序要么基于他們的自然順序要么通過PriorirtyQueue構造器傳入的Comparator來確定。下面的代碼示例了PirorityQueue 類的使用方法。在Queue的前邊是字符串"Alabama"-由于元素在PriorityQueue中是按自然順序排列的(此例中是按字母表順序)。
PriorityQueue<String> priorityQueue = new PriorityQueue<String>();
priorityQueue.offer("Texas");
priorityQueue.offer("Alabama");
priorityQueue.offer("California");
priorityQueue.offer("Rhode Island");
int queueSize = priorityQueue.size();
for (int i =0; i< queueSize; i++)
{
System.out.println(priorityQueue.poll());
}
執行結果如下:
Alabama
California
Rhode Island
Texas
Queue各項按照自然順序-字母順序-來排列。
如上提到的,你可以創建你自己的Comparator類并提供給PirorityQueue。如此,你可以定義你自己的排序方式。在PriorityQueueComparatorUsageExample 類中可找到此方式,在其中使用了一個名為State的助手類。如你在下邊看到的,在類定義中,State只簡單地包含了一個名字和人口。
private String name;
private int population;
public State(String name, int population)
{
super();
this.name = name;
this.population = population;
}
public String getName()
{
return this.name;
}
public int getPopulation()
{
return this.population;
}
public String toString()
{
return getName() + " - " + getPopulation();
}
在PriorityQueueComparatorUsageExample中,Queue使用了java.util.Comparator的自定義實現來定義排列順序(如下)。
PriorityQueue<State> priorityQueue =
new PriorityQueue(6, new Comparator<State>()
{
public int compare(State a, State b)
{
System.out.println("Comparing Populations");
int populationA = a.getPopulation();
int populationB = b.getPopulation();
if (populationB>populationA)
return 1;
else if (populationB<populationA)
return -1;
else
return 0;
}
}
);
執行PriorityQueueComparatorUsageExample 類后,添加到Queue中的State對象將按人口數量排放(從低到高)。
阻塞Queue
Queue通常限定于給定大小。迄今為止,通過Queue的實現你已經看到,使用offer或add方法enqueue Queue(并用remove或poll來dequeue Queue)都是假設如果Queue不能提供添加或移除操作,那么你不需要等待程序執行。java.util.concurrent.BlockingQueue接口實現阻塞。它添加了put和take方法。舉一個例子可能更有用。
使 用原來的producer/consumer關系來假定你的producer寫一個Queue(更特定是一個BlockingQueue)。你有一些 consumer正從Queue中讀取,在一個有序的方式下,哪種方式是你希望看到的。基本上,每個consumer需要等待先于它并獲準從Queue中 提取項目的前一個consumer。用程序構建此結構,先生成一個producer線程用于向一個Queue中寫數據,然后生成一些consumer線程 從同一Queue中讀取數據。注意,線程會阻塞另一線程直到當前線程做完從Queue中提取一項的操作。
下 面的代碼展示了類Producer寫BlockingQueue的過程。注意run方法中的對象(你有責任實現,因為你繼承了Thread)在等待了隨機 數量的時間(范圍從100到500毫秒)后,被放進了BlockingQueue。放到Queue中的對象只是一些包含消息產生時的時間的字符串。
添加對象的實際工作是由如下語句實現的:
blockingQueue.put("Enqueued at: " + time)
put方法會拋出InterruptedException,因此,put操作需要被try...catch塊包圍,用來捕獲被拋出的異常 (見Listing 1 )。
從producer中提取消息的是Consumer對象,它也繼承自Thread對象并因此要實現run方法(見Listing 2 )。
Consumer 類在設計上是類似于Producer類的。Consumer類使用take方法去從Queue中取出(即dequeue)消息,而不是將消息放到 BlockingQueue中。如前所述,這需要等待到有什么內容確實存在于Queue中時才發生。如果producer線程停止放置(即 enqueue)對象到Queue中,那么consumer將等待到Queue的項目有效為止。下面所示的TestBlockingQueue類,產生四 個consumer線程,它們從BlockingQueue中嘗試提取對象。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestBlockingQueue
{
public static void main(String args[])
{
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
Producer producer = new Producer(blockingQueue, System.out);
Consumer consumerA = new Consumer("ConsumerA", blockingQueue, System.out);
Consumer consumerB = new Consumer("ConsumerB", blockingQueue, System.out);
Consumer consumerC = new Consumer("ConsumerC", blockingQueue, System.out);
Consumer consumerD = new Consumer("ConsumerD", blockingQueue, System.out);
producer.start();
consumerA.start();
consumerB.start();
consumerC.start();
consumerD.start();
}
}
Figure 1. Consumer Threads: These threads dequeue messages from the BlockingQueue in the order that you spawned them.
下面一行創建BlockingQueue:
BlockingQueue<String> blockingQueue
= new LinkedBlockingQueue<String>();
注意,它使用BlockingQueue的LinkedBlockingQueue實現。 這是因為BlockingQueue是一個抽象類,你不能直接實例化它。你也可以使用ArrayBlockingQueueQueue類型。 ArrayBlockingQueue使用一個數組作為它的存儲設備,而LinkedBlockingQueue使用一個LinkedList。 ArrayBlockingQueue的容量是固定的。對于LinkedBlockingQueue,最大值可以指定;默認是無邊界的。本示例代碼采用無 邊界方式。
在類的執行期間,從Queue中讀取對象以順序方式執行(見下面例子的執行)。實際上,一個consumer線程阻塞其他訪問BlockingQueue的線程直到它可以從Queue中取出一個對象。
DelayQueue-我是/不是不完整的
在某些情況下,存放在Queue中的對象,在它們準備被取出之前,會需要被放在另一Queue中一段時間。這時你可使用java.util.concurrent.DelayQueue類,他實現類BlockingQueue接口。DelayQueue需要Queue對象被駐留在Queue上一段指定時間。
我想用來證實它的現實例子(這可能是你非常渴望的)是關于松餅(muffins)。噢,Muffin對象(象我們正在談論的Java-沒有coffee雙關意圖)。假定你有一個DelayQueue并在其中放了一些Muffin對象。Muffin對象(如下所示)必須實現java.util.concurrent.Delayed 接口,以便可被放在DelayQueue中。這個接口需要Muffin對象實現getDelay方法(如下所示)。getDelay方法,實際上聲明給多 長時間讓對象保存在DelayQueue中。當該方法返回的值變為0或小于0時,對象就準備完畢(或在本例子中,是烤制完畢)并允許被取出(見 Listing 3 )。
Muffin類也實現compareTo(java.util.concurrent.Delayed)方法。由于Delayed接口繼承自 java.lang.Comparable 類,這通過約定限制你要實現Muffin對象的bakeCompletion時間。
由于你不是真想去吃沒有完全烤熟的Muffin,因此,需要將Muffin放在DelayQueue中存放推薦的烤制時間。Listing 4 ,取自DelayQueueUsageExample類,展示了從DelayQueue中enqueue和dequeue Muffin對象。
如你所見,對Muffin對象的烤制時間是使用它的構造器設置的(構造器期望烤制時間是以秒計)。
如 前所講,Muffin對象放到DelayQueue中是不允許被取出的,直到他的延時時間(又叫烤制時間)超期。元素被從Queue中取出基于最早的延時 時間。在本例中,如果你有一些已經烤過的Muffin對象,他們將按他們已經等待多久而被取出(換句話說,最早被烤制的Muffin會在新烤制的 Muffin之前被取出)。
SynchronousQueue
在Java 1.5中,另外一種阻塞Queue實現是SynchronousQueue。相當有趣的是,該Queue沒有內在容量。這是故意的,因為Queue意在用 于傳遞目的。這意味著,在一個同步Queue結構中,put請求必須等待來自另一線程的給SynchronousQueue的take請求。同時,一個 take請求必須等待一個來自另一線程的給SynchronousQueue的put請求。用程序來示例此概念,可參見示例代碼。類似于前邊的 LinkedBlockingQueue例子,它包含一個consumer(SynchConsumer),見Listing 5 。
Listing 5 中的代碼使用SynchronousQueue類的 poll(long timeout,TimeUnit unit)方法。此方法允許poll過程在厭倦等待另一消費線程寫SynchronousQueue之前等待一個指定時間(本例中是20秒)。
在Listing 6 中的producer(SynchProducer )使用相似的offer(E o,long timeout, TimeUnit unit)方法去放置對象到SynchronousQueue中。使用此方法允許在 厭倦等待另一線程去讀取SynchronousQueue之前等待一段時間(本例中為10秒) 。
TestSynchQueue 展示了producer和consumer的動作:
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TestSynchQueue
{
public static void main(String args[])
{
SynchronousQueue<String> synchQueue = new SynchronousQueue<String>();
SynchProducer producer = new SynchProducer("ProducerA",synchQueue, System.out);
SynchConsumer consumerA = new SynchConsumer("ConsumerA", synchQueue, System.out);
consumerA.start();
producer.start();
}
}
當試圖明白隱藏在SynchronousQueue后面的概念時,要牢記這些Queue通常被使用在什么地方。JavaDoc中關于同步Queue指出:
"它們[同步Queue]是適合于傳遞設計,在那里運行在一個線程中的對象必須與運行在另外一個線程中的對象同步以便于交給它一些信息,時間或任務。"