解決并發(fā)大數(shù)據(jù)量阻塞之道的重要思想是:
利用多線程將數(shù)據(jù)接收和數(shù)據(jù)處理兩塊業(yè)務(wù)分開。1、數(shù)據(jù)接收
public class MessageReceiver{
//建立數(shù)據(jù)緩存區(qū)
private List messageList = new ArrayList();
//添加消息至數(shù)據(jù)緩沖區(qū)
public? void addMessage(ESBInput inputMessage) {
??synchronized(messageList) {
??? messageList.add(inputMessage);
??? messageList.notifyAll();
??}
?}
//并發(fā)數(shù)據(jù)接收入口,調(diào)用添加數(shù)據(jù)方法,而不是直接操作inputMessage,對(duì)其解析或者別的操作,不然極可能造成阻塞
public void handleMessage(Input inputMessage) {
?????addMessage(inputMessage);
?}
//從數(shù)據(jù)緩沖區(qū)取數(shù)據(jù),它應(yīng)該被另外一個(gè)線程調(diào)用
public List getMessageList() {
??List _processList = new ArrayList();
??try {
??? synchronized (messageList) {
????? while (messageList.size() == 0) {
???? messageList.wait();
????? }
????? _processList.addAll(messageList);
//清空緩存區(qū)
????? messageList.clear();
??? }
??}
??catch (Exception ex) {
???ex.printStackTrace();
??}
??return _processList;
?}
}
2、數(shù)據(jù)處理
//線程
public class MessageProcessThread extends Thread {
MessageReceiver receiver;
public MessageProcessThread (MessageReceiver receiver){
this.receiver=receiver;
}
public void run() {
?? while (true) {
?? List list = receiver.getMessageList();
?? while (iter.hasNext()) {
??? //do what you want....
? handleMessage((Input)list .next());
????? try {
???sleep (100);
?? }
?? catch (Exception ex) {
??? ?}
?? }
?}
}
public handleMessage(Input messsage){
?? //處理消息
}
posted on 2009-01-06 10:28
蔣家狂潮 閱讀(963)
評(píng)論(0) 編輯 收藏 所屬分類:
Basic