在一些特殊的場合,我們可能需要使用異步的IO來大幅提高性能.
如日志信息收集.
而rapid-framework提供的異步IO類,使用生產者/消費者的多線程同步模式及Decorator模式,如同使用正常的IO一樣,只需套多一層AsyncWriter/AsyncOutputStream,即可將普通IO轉換為異步IO來使用.打開一個異步IO后,將會在后臺開啟一個異步的線程來寫數據.
異步的Writer使用:
1
BufferedWriter?writer?
=
?
new
?BufferedWriter(
new
?AsyncWriter(
new
?FileWriter(
"
c:/debug.log
"
)));???
2
writer.write(
"
xxxxx
"
);??
異步的OutputStream使用:
1
BufferedOutputStream?output?
=
?
new
?BufferedOutputStream(
new
?AsyncOutputStream(
new
?FileOutputStream(
"
c:/debug.log
"
)));???
2
output.write(
"
foo
"
.getBytes());??
?
在output使用完確保output被close,因為在close時,會強制異步線程將數據全部寫入最終的targetOutput. 而調用flush()方法則是空操作,不會寫數據.
異步IO使用tip(1):
可以將BufferedWriter/BufferedOutputStream的緩沖區加大,以減少寫入次數.
異步IO使用tip(2):
在close異步IO時也放在一個單獨的線程中,因為在實際應用場景中,close異步IO可能是十分耗時的操作.
AsyncWriter源碼:
??1?public?class?AsyncWriter?extends?Writer?{
??2?
??3?????private?static?Log?log?=?LogFactory.getLog(AsyncWriter.class);
??4?????
??5?????private?static?final?int?DEFAULT_QUEUE_CAPACITY?=?50000;
??6?????private?final?static?char[]?CLOSED_SIGNEL?=?new?char[0];
??7?????
??8?????private?Writer?out;
??9?????private?DataProcessorThread?dataProcessor;
?10?????private?boolean?isClosed?=?false;
?11?????private?BlockingQueue<char[]>?queue?;
?12?????
?13?????private?AsyncExceptinHandler?asyncExceptinHandler?=?new?DefaultAsyncExceptinHandler();
?14?????private?static?long?threadSeqNumber;
?15?????private?static?synchronized?long?nextThreadID()?{
?16?????????return?++threadSeqNumber;
?17?????}
?18?????
?19?????private?class?DataProcessorThread?extends?Thread?{
?20?????????
?21?????????private?boolean?enabled?=?true;
?22?????????private?boolean?hasRuned?=?false;
?23?????????DataProcessorThread()?{
?24?????????????super("AsyncWriter.DataProcessorThread-"+nextThreadID());
?25?????????????setDaemon(true);
?26?????????}
?27?
?28?????????public?void?run()?{
?29?????????????hasRuned?=?true;
?30?????????????while?(this.enabled?||?!queue.isEmpty())?{
?31?????????????????
?32?????????????????char[]?buf;
?33?????????????????try?{
?34?????????????????????buf?=?queue.take();
?35?????????????????}?catch?(InterruptedException?e)?{
?36?//????????????????????e.printStackTrace();
?37?????????????????????continue;
?38?????????????????}
?39?????????????????
?40?????????????????if(buf?==?CLOSED_SIGNEL)?{
?41?????????????????????return;
?42?????????????????}
?43?????????????????
?44?????????????????try?{
?45?????????????????????out.write(buf);
?46?????????????????}?catch?(IOException?e)?{
?47??????????????????????asyncExceptinHandler.handle(e);
?48?????????????????}
?49?????????????}
?50?????????}
?51?????}
?52?
?53?????public?AsyncWriter(Writer?out)?{
?54?????????this(out,DEFAULT_QUEUE_CAPACITY,Thread.NORM_PRIORITY?+?1);
?55?????}
?56?????
?57?????public?AsyncWriter(Writer?out,int?queueCapacity)?{
?58?????????this(out,queueCapacity,Thread.NORM_PRIORITY?+?1);
?59?????}
?60?????
?61?????public?AsyncWriter(Writer?out,int?queueCapacity,int?dataProcesserThreadPriority)?{
?62?????????this(out,new?ArrayBlockingQueue(queueCapacity),dataProcesserThreadPriority);
?63?????}
?64?????
?65?????public?AsyncWriter(Writer?out,BlockingQueue?queue,int?dataProcesserThreadPriority)?{
?66?????????if(out?==?null)?throw?new?NullPointerException();
?67?????????if(queue?==?null)?throw?new?NullPointerException();
?68?????????
?69?????????this.queue?=?queue;
?70?????????this.dataProcessor?=?new?DataProcessorThread();
?71?????????if(dataProcesserThreadPriority?!=?Thread.NORM_PRIORITY)?{
?72?????????????this.dataProcessor.setPriority(dataProcesserThreadPriority);
?73?????????}
?74?????????this.dataProcessor.start();
?75?????????this.out?=?out;
?76?????}
?77?????
?78?????public?AsyncWriter(Writer?out,AsyncExceptinHandler?handler)?{
?79?????????this(out);
?80?????????setAsyncExceptinHandler(handler);
?81?????}
?82?
?83?????public?void?write(char[]?buf,?int?offset,?int?length)?throws?IOException?{
?84?????????synchronized?(lock)?{
?85?????????????if(isClosed)?throw?new?IOException("already?closed");
?86?????????????try?{
?87?????????????????queue.put(BufferCopyUtils.copyBuffer(buf,?offset,?length));
?88?????????????}?catch?(InterruptedException?e)?{
?89?????????????????throw?new?IOException("AsyncWriter?occer?error",e);
?90?????????????}
?91?????????}
?92?????}
?93?
?94?????public?void?close()?throws?IOException?{
?95?????????synchronized?(lock)?{
?96?????????????try?{
?97?????????????????isClosed?=?true;
?98?????????????????dataProcessor.enabled?=?false;
?99?????????????????if(queue.isEmpty())?{
100?????????????????????queue.offer(CLOSED_SIGNEL);
101?????????????????}
102?????????????????
103?????????????????try?{
104?????????????????????dataProcessor.join();
105?????????????????}?catch?(InterruptedException?e)?{
106?????????????????????//ignore
107?????????????????}
108?????????????????
109?????????????????if(!dataProcessor.hasRuned)?{
110?????????????????????dataProcessor.run();
111?????????????????}
112?????????????}finally?{
113?????????????????out.close();
114?????????????}
115?????????}
116?????}
117?????
118?????public?void?flush()?throws?IOException?{
119?????}
120?
121?????protected?void?finalize()?throws?Throwable?{
122?????????super.finalize();
123?????????if(!isClosed)?{
124?????????????log.warn("AsyncWriter?not?close:"+this);
125?????????????close();
126?????????}
127?????}
128?
129?????public?void?setAsyncExceptinHandler(AsyncExceptinHandler?asyncExceptinHandler)?{
130?????????if(asyncExceptinHandler?==?null)?throw?new?NullPointerException();
131?????????this.asyncExceptinHandler?=?asyncExceptinHandler;
132?????}
133?
134?}
?
?
rapid-framework網站:
http://code.google.com/p/rapid-framework
?
在線javadoc:
http://www.rapid-framework.org.cn/rapid-javadoc-v2.0.x/
?