由于Dubbo底層采用Socket進(jìn)行通信,自己對通信理理論也不是很清楚,所以順便把通信的知識也學(xué)習(xí)一下。

n  通信理論

計算機與外界的信息交換稱為通信。基本的通信方法有并行通信和串行通信兩種。

1.一組信息(通常是字節(jié))的各位數(shù)據(jù)被同時傳送的通信方法稱為并行通信。并行通信依靠并行I/O接口實現(xiàn)。并行通信速度快,但傳輸線根數(shù)多,只適用于近距離(相距數(shù)公尺)的通信。

2.一組信息的各位數(shù)據(jù)被逐位順序傳送的通信方式稱為串行通信。串行通信可通過串行接口來實現(xiàn)。串行通信速度慢,但傳輸線少,適宜長距離通信。

串行通信按信息傳送方向分為以下3種:

1)   單工

只能一個方向傳輸數(shù)據(jù)

【原創(chuàng)】Alibaba Dubbo框架同步調(diào)用原理分析-1 - sun - 學(xué)無止境

2)   半雙工

信息能雙向傳輸,但不能同時雙向傳輸

【原創(chuàng)】Alibaba Dubbo框架同步調(diào)用原理分析-1 - sun - 學(xué)無止境

3)   全雙工

能雙向傳輸并且可以同時雙向傳輸

【原創(chuàng)】Alibaba Dubbo框架同步調(diào)用原理分析-1 - sun - 學(xué)無止境 

n  Socket

Socket 是一種應(yīng)用接口, TCP/IP 是網(wǎng)絡(luò)傳輸協(xié)議,雖然接口相同, 但是不同的協(xié)議會有不同的服務(wù)性質(zhì)。創(chuàng)建Socket 連接時,可以指定使用的傳輸層協(xié)議,Socket 可以支持不同的傳輸層協(xié)議(TCP 或UDP ),當(dāng)使用TCP 協(xié)議進(jìn)行連接時,該Socket 連接就是一個TCP 連接。Soket 跟TCP/IP 并沒有必然的聯(lián)系。Socket 編程接口在設(shè)計的時候,就希望也能適應(yīng)其他的網(wǎng)絡(luò)協(xié)議。所以,socket 的出現(xiàn)只是可以更方便的使用TCP/IP 協(xié)議棧而已。

引自:http://hi.baidu.com/lewutian/blog/item/b28e27fd446d641d09244d08.html

上一個通信理論其實是想說Socket(TCP)通信是全雙工的方式

n  Dubbo遠(yuǎn)程同步調(diào)用原理分析

從Dubbo開源文檔上了解到一個調(diào)用過程如下圖

http://code.alibabatech.com/wiki/display/dubbo/User+Guide#UserGuide-APIReference

另外文檔里有說明:Dubbo缺省協(xié)議采用單一長連接和NIO異步通訊,適合于小數(shù)據(jù)量大并發(fā)的服務(wù)調(diào)用,以及服務(wù)消費者機器數(shù)遠(yuǎn)大于服務(wù)提供者機器數(shù)的情況。

【原創(chuàng)】Alibaba Dubbo框架同步調(diào)用原理分析-1 - sun - 學(xué)無止境

Dubbo缺省協(xié)議,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。

  • 連接個數(shù):單連接
  • 連接方式:長連接
  • 傳輸協(xié)議:TCP
  • 傳輸方式:NIO異步傳輸
  • 序列化:Hessian二進(jìn)制序列化
  • 適用范圍:傳入傳出參數(shù)數(shù)據(jù)包較小(建議小于100K),消費者比提供者個數(shù)多,單一消費者無法壓滿提供者,盡量不要用dubbo協(xié)議傳輸大文件或超大字符串
  • 適用場景:常規(guī)遠(yuǎn)程服務(wù)方法調(diào)用

 通常,一個典型的同步遠(yuǎn)程調(diào)用應(yīng)該是這樣的:

【原創(chuàng)】Alibaba Dubbo框架同步調(diào)用原理分析-1 - sun - 學(xué)無止境

1, 客戶端線程調(diào)用遠(yuǎn)程接口,向服務(wù)端發(fā)送請求,同時當(dāng)前線程應(yīng)該處于“暫停“狀態(tài),即線程不能向后執(zhí)行了,必需要拿到服務(wù)端給自己的結(jié)果后才能向后執(zhí)行

2, 服務(wù)端接到客戶端請求后,處理請求,將結(jié)果給客戶端
3, 客戶端收到結(jié)果,然后當(dāng)前線程繼續(xù)往后執(zhí)行

Dubbo里使用到了Socket(采用apache mina框架做底層調(diào)用)來建立長連接,發(fā)送、接收數(shù)據(jù),底層使用apache mina框架的IoSession進(jìn)行發(fā)送消息。

查看Dubbo文檔及源代碼可知,Dubbo底層使用Socket發(fā)送消息的形式進(jìn)行數(shù)據(jù)傳遞,結(jié)合了mina框架,使用IoSession.write()方法,這個方法調(diào)用后對于整個遠(yuǎn)程調(diào)用(從發(fā)出請求到接收到結(jié)果)來說是一個異步的,即對于當(dāng)前線程來說,將請求發(fā)送出來,線程就可以往后執(zhí)行了,至于服務(wù)端的結(jié)果,是服務(wù)端處理完成后,再以消息的形式發(fā)送給客戶端的。于是這里出現(xiàn)了2個問題:
  • 當(dāng)前線程怎么讓它“暫停”,等結(jié)果回來后,再向后執(zhí)行?
  • 正如前面所說,Socket通信是一個全雙工的方式,如果有多個線程同時進(jìn)行遠(yuǎn)程方法調(diào)用,這時建立在client server之間的socket連接上會有很多雙方發(fā)送的消息傳遞,前后順序也可能是亂七八糟的,server處理完結(jié)果后,將結(jié)果消息發(fā)送給client,client收到很多消息,怎么知道哪個消息結(jié)果是原先哪個線程調(diào)用的?

分析源代碼,基本原理如下:
  1. client一個線程調(diào)用遠(yuǎn)程接口,生成一個唯一的ID(比如一段隨機字符串,UUID等),Dubbo是使用AtomicLong從0開始累計數(shù)字的
  2. 將打包的方法調(diào)用信息(如調(diào)用的接口名稱,方法名稱,參數(shù)值列表等),和處理結(jié)果的回調(diào)對象callback,全部封裝在一起,組成一個對象object
  3. 向?qū)iT存放調(diào)用信息的全局ConcurrentHashMap里面put(ID, object)
  4. 將ID和打包的方法調(diào)用信息封裝成一對象connRequest,使用IoSession.write(connRequest)異步發(fā)送出去
  5. 當(dāng)前線程再使用callback的get()方法試圖獲取遠(yuǎn)程返回的結(jié)果,在get()內(nèi)部,則使用synchronized獲取回調(diào)對象callback的鎖, 再先檢測是否已經(jīng)獲取到結(jié)果,如果沒有,然后調(diào)用callback的wait()方法,釋放callback上的鎖,讓當(dāng)前線程處于等待狀態(tài)。
  6. 服務(wù)端接收到請求并處理后,將結(jié)果(此結(jié)果中包含了前面的ID,即回傳)發(fā)送給客戶端,客戶端socket連接上專門監(jiān)聽消息的線程收到消息,分析結(jié)果,取到ID,再從前面的ConcurrentHashMap里面get(ID),從而找到callback,將方法調(diào)用結(jié)果設(shè)置到callback對象里。
  7. 監(jiān)聽線程接著使用synchronized獲取回調(diào)對象callback的鎖(因為前面調(diào)用過wait(),那個線程已釋放callback的鎖了),再notifyAll(),喚醒前面處于等待狀態(tài)的線程繼續(xù)執(zhí)行(callback的get()方法繼續(xù)執(zhí)行就能拿到調(diào)用結(jié)果了),至此,整個過程結(jié)束。
這里還需要畫一個大圖來描述,后面再補了
需要注意的是,這里的callback對象是每次調(diào)用產(chǎn)生一個新的,不能共享,否則會有問題;另外ID必需至少保證在一個Socket連接里面是唯一的。

現(xiàn)在,前面兩個問題已經(jīng)有答案了,
  • 當(dāng)前線程怎么讓它“暫停”,等結(jié)果回來后,再向后執(zhí)行?
     答:先生成一個對象obj,在一個全局map里put(ID,obj)存放起來,再用synchronized獲取obj鎖,再調(diào)用obj.wait()讓當(dāng)前線程處于等待狀態(tài),然后另一消息監(jiān)聽線程等到服務(wù)端結(jié)果來了后,再map.get(ID)找到obj,再用synchronized獲取obj鎖,再調(diào)用obj.notifyAll()喚醒前面處于等待狀態(tài)的線程。
  • 正如前面所說,Socket通信是一個全雙工的方式,如果有多個線程同時進(jìn)行遠(yuǎn)程方法調(diào)用,這時建立在client server之間的socket連接上會有很多雙方發(fā)送的消息傳遞,前后順序也可能是亂七八糟的,server處理完結(jié)果后,將結(jié)果消息發(fā)送給client,client收到很多消息,怎么知道哪個消息結(jié)果是原先哪個線程調(diào)用的?
     答:使用一個ID,讓其唯一,然后傳遞給服務(wù)端,再服務(wù)端又回傳回來,這樣就知道結(jié)果是原先哪個線程的了。

這種做法不是第一次見了,10年在上一公司里,也是遠(yuǎn)程接口調(diào)用,不過走的消息中間件rabbitmq,同步調(diào)用的原理跟這類似,詳見:rabbitmq 學(xué)習(xí)-9- RpcClient發(fā)送消息和同步接收消息原理

關(guān)鍵代碼:

com.taobao.remoting.impl.DefaultClient.java

//同步調(diào)用遠(yuǎn)程接口

public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {

        byte protocol = getProtocol(control);

        if (!TRConstants.isValidProtocol(protocol)) {

            throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");

        }

        ResponseFuture future = invokeWithFuture(appRequest, control);

        return future.get();  //獲取結(jié)果時讓當(dāng)前線程等待,ResponseFuture其實就是前面說的callback

}

public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {

         byte protocol = getProtocol(control);

         long timeout = getTimeout(control);

         ConnectionRequest request = new ConnectionRequest(appRequest);

         request.setSerializeProtocol(protocol);

         Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);

         connection.sendRequestWithCallback(request, adapter, timeout);

         return adapter;

}

 

Callback2FutureAdapter implements ResponseFuture

public Object get() throws RemotingException, InterruptedException {

synchronized (this) {  // 旋鎖

   while (!isDone) {  // 是否有結(jié)果了

wait(); //沒結(jié)果是釋放鎖,讓當(dāng)前線程處于等待狀態(tài)

   }

}

if (errorCode == TRConstants.RESULT_TIMEOUT) {

   throw new TimeoutException("Wait response timeout, request["

   + connectionRequest.getAppRequest() + "].");

}

else if (errorCode > 0) {

   throw new RemotingException(errorMsg);

}

else {

   return appResp;

}

}

客戶端收到服務(wù)端結(jié)果后,回調(diào)時相關(guān)方法,即設(shè)置isDone = true并notifyAll()

public void handleResponse(Object _appResponse) {

         appResp = _appResponse; //將遠(yuǎn)程調(diào)用結(jié)果設(shè)置到callback中來

         setDone();

}

public void onRemotingException(int _errorType, String _errorMsg) {

         errorCode = _errorType;

         errorMsg = _errorMsg;

         setDone();

}

private void setDone() {

         isDone = true;

         synchronized (this) { //獲取鎖,因為前面wait()已經(jīng)釋放了callback的鎖了

             notifyAll(); // 喚醒處于等待的線程

         }

}

 

com.taobao.remoting.impl.DefaultConnection.java

 

// 用來存放請求和回調(diào)的MAP

private final ConcurrentHashMap<Long, Object[]> requestResidents;

 

//發(fā)送消息出去

void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {

         long requestId = connRequest.getId();

         long waitBegin = System.currentTimeMillis();

         long waitEnd = waitBegin + timeoutMs;

         Object[] queue = new Object[4];

         int idx = 0;

         queue[idx++] = waitEnd;

         queue[idx++] = waitBegin;   //用于記錄日志

         queue[idx++] = connRequest; //用于記錄日志

         queue[idx++] = callback;

         requestResidents.put(requestId, queue); // 記錄響應(yīng)隊列

         write(connRequest);

 

         // 埋點記錄等待響應(yīng)的Map的大小

         StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),

                   1L);

}

public void write(final Object connectionMsg) {

//mina里的IoSession.write()發(fā)送消息

         WriteFuture writeFuture = ioSession.write(connectionMsg);

         // 注冊FutureListener,當(dāng)請求發(fā)送失敗后,能夠立即做出響應(yīng)

         writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));

}

 

/**

* 在得到響應(yīng)后,刪除對應(yīng)的請求隊列,并執(zhí)行回調(diào)

* 調(diào)用者:MINA線程

*/

public void putResponse(final ConnectionResponse connResp) {

         final long requestId = connResp.getRequestId();

         Object[] queue = requestResidents.remove(requestId);

         if (null == queue) {

             Object appResp = connResp.getAppResponse();

             String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();

             StringBuilder sb = new StringBuilder();

             sb.append("Not found response receiver for requestId=[").append(requestId).append("],");

             sb.append("from [").append(connResp.getHost()).append("],");

             sb.append("response type [").append(appRespClazz).append("].");

             LOGGER.warn(sb.toString());

             return;

         }

         int idx = 0;

         idx++;

         long waitBegin = (Long) queue[idx++];

         ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];

         ResponseCallback callback = (ResponseCallback) queue[idx++];

         // ** 把回調(diào)任務(wù)交給業(yè)務(wù)提供的線程池執(zhí)行 **

         Executor callbackExecutor = callback.getExecutor();

         callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));

 

         long duration = System.currentTimeMillis() - waitBegin; // 實際讀響應(yīng)時間

         logIfResponseError(connResp, duration, connRequest.getAppRequest());

}

 

CallbackExecutorTask

static private class CallbackExecutorTask implements Runnable {

         final ConnectionResponse resp;

         final ResponseCallback callback;

         final Thread createThread;

 

         CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) {

             resp = _resp;

             callback = _cb;

             createThread = Thread.currentThread();

         }

 

         public void run() {

             // 預(yù)防這種情況:業(yè)務(wù)提供的Executor,讓調(diào)用者線程來執(zhí)行任務(wù)

             if (createThread == Thread.currentThread()

                       && callback.getExecutor() != DIYExecutor.getInstance()) {

                   StringBuilder sb = new StringBuilder();

                   sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:");

                   sb.append("Can not callback task on the network io thhread.");

                   LOGGER.warn(sb.toString());

                   return;

             }

 

             if (TRConstants.RESULT_SUCCESS == resp.getResult()) {

                   callback.handleResponse(resp.getAppResponse()); //設(shè)置調(diào)用結(jié)果

             }

             else {

                   callback.onRemotingException(resp.getResult(), resp

                            .getErrorMsg());  //處理調(diào)用異常

             }

         }

}

 

另外:

1, 服務(wù)端在處理客戶端的消息,然后再處理時,使用了線程池來并行處理,不用一個一個消息的處理

同樣,客戶端接收到服務(wù)端的消息,也是使用線程池來處理消息,再回調(diào)

 

轉(zhuǎn)載自:http://sunjun041640.blog.163.com/blog/static/256268322011111882453405/