com.taobao.remoting.impl.DefaultConnection.java // 用來存放請求和回調(diào)的MAP private final ConcurrentHashMap<Long, Object[]> requestResidents; //發(fā)送消息出去 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) { long requestId = connRequest.getId(); long waitBegin = System.currentTimeMillis(); long waitEnd = waitBegin + timeoutMs; Object[] queue = new Object[4]; int idx = 0; queue[idx++] = waitEnd; queue[idx++] = waitBegin; //用于記錄日志 queue[idx++] = connRequest; //用于記錄日志 queue[idx++] = callback; requestResidents.put(requestId, queue); // 記錄響應(yīng)隊列 write(connRequest); // 埋點記錄等待響應(yīng)的Map的大小 StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(), 1L); } public void write(final Object connectionMsg) { //mina里的IoSession.write()發(fā)送消息 WriteFuture writeFuture = ioSession.write(connectionMsg); // 注冊FutureListener,當(dāng)請求發(fā)送失敗后,能夠立即做出響應(yīng) writeFuture.addListener(new MsgWrittenListener(this, connectionMsg)); } /** * 在得到響應(yīng)后,刪除對應(yīng)的請求隊列,并執(zhí)行回調(diào) * 調(diào)用者:MINA線程 */ public void putResponse(final ConnectionResponse connResp) { final long requestId = connResp.getRequestId(); Object[] queue = requestResidents.remove(requestId); if (null == queue) { Object appResp = connResp.getAppResponse(); String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName(); StringBuilder sb = new StringBuilder(); sb.append("Not found response receiver for requestId=[").append(requestId).append("],"); sb.append("from [").append(connResp.getHost()).append("],"); sb.append("response type [").append(appRespClazz).append("]."); LOGGER.warn(sb.toString()); return; } int idx = 0; idx++; long waitBegin = (Long) queue[idx++]; ConnectionRequest connRequest = (ConnectionRequest) queue[idx++]; ResponseCallback callback = (ResponseCallback) queue[idx++]; // ** 把回調(diào)任務(wù)交給業(yè)務(wù)提供的線程池執(zhí)行 ** Executor callbackExecutor = callback.getExecutor(); callbackExecutor.execute(new CallbackExecutorTask(connResp, callback)); long duration = System.currentTimeMillis() - waitBegin; // 實際讀響應(yīng)時間 logIfResponseError(connResp, duration, connRequest.getAppRequest()); } |