<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    jinfeng_wang

    G-G-S,D-D-U!

    BlogJava 首頁 新隨筆 聯(lián)系 聚合 管理
      400 Posts :: 0 Stories :: 296 Comments :: 0 Trackbacks

    ZooKeeper.class

     1 public String create(final String path, byte data[], List<ACL> acl,
     2             CreateMode createMode)
     3         throws KeeperException, InterruptedException
     4     {
     5         final String clientPath = path;
     6         PathUtils.validatePath(clientPath, createMode.isSequential());
     7 
     8         final String serverPath = prependChroot(clientPath);
     9 
    10         RequestHeader h = new RequestHeader();
    11         h.setType(ZooDefs.OpCode.create);
    12         CreateRequest request = new CreateRequest();
    13         CreateResponse response = new CreateResponse();
    14         request.setData(data);
    15         request.setFlags(createMode.toFlag());
    16         request.setPath(serverPath);
    17         if (acl != null && acl.size() == 0) {
    18             throw new KeeperException.InvalidACLException();
    19         }
    20         request.setAcl(acl);
    21         ReplyHeader r = cnxn.submitRequest(h, request, response, null);
    22         if (r.getErr() != 0) {
    23             throw KeeperException.create(KeeperException.Code.get(r.getErr()),
    24                     clientPath);
    25         }
    26         if (cnxn.chrootPath == null) {
    27             return response.getPath();
    28         } else {
    29             return response.getPath().substring(cnxn.chrootPath.length());
    30         }
    31     }


    ClientCnxn.class, 放到隊列中,循環(huán)等到packet標識位finished。

     1 public ReplyHeader submitRequest(RequestHeader h, Record request,
     2             Record response, WatchRegistration watchRegistration)
     3             throws InterruptedException {
     4         ReplyHeader r = new ReplyHeader();
     5         Packet packet = queuePacket(h, r, request, response, nullnullnull,
     6                     null, watchRegistration);
     7         synchronized (packet) {
     8             while (!packet.finished) {
     9                 packet.wait();
    10             }
    11         }
    12         return r;
    13     }


     1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
     2             Record response, AsyncCallback cb, String clientPath,
     3             String serverPath, Object ctx, WatchRegistration watchRegistration)
     4     {
     5         Packet packet = null;
     6 
     7         // Note that we do not generate the Xid for the packet yet. It is
     8         // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
     9         // where the packet is actually sent.
    10         synchronized (outgoingQueue) {
    11             packet = new Packet(h, r, request, response, watchRegistration);
    12             packet.cb = cb;
    13             packet.ctx = ctx;
    14             packet.clientPath = clientPath;
    15             packet.serverPath = serverPath;
    16             if (!state.isAlive() || closing) {
    17                 conLossPacket(packet);
    18             } else {
    19                 // If the client is asking to close the session then
    20                 // mark as closing
    21                 if (h.getType() == OpCode.closeSession) {
    22                     closing = true;
    23                 }
    24                 outgoingQueue.add(packet);
    25             }
    26         }
    27         sendThread.getClientCnxnSocket().wakeupCnxn();
    28         return packet;
    29     }


    ClientCnxnSocket.class

     1 @Override
     2     void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
     3                      ClientCnxn cnxn)
     4             throws IOException, InterruptedException {
     5         selector.select(waitTimeOut);
     6         Set<SelectionKey> selected;
     7         synchronized (this) {
     8             selected = selector.selectedKeys();
     9         }
    10         // Everything below and until we get back to the select is
    11         // non blocking, so time is effectively a constant. That is
    12         // Why we just have to do this once, here
    13         updateNow();
    14         for (SelectionKey k : selected) {
    15             SocketChannel sc = ((SocketChannel) k.channel());
    16             if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
    17                 if (sc.finishConnect()) {
    18                     updateLastSendAndHeard();
    19                     sendThread.primeConnection();
    20                 }
    21             } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
    22                 doIO(pendingQueue, outgoingQueue, cnxn);
    23             }
    24         }
    25         if (sendThread.getZkState().isConnected()) {
    26             synchronized(outgoingQueue) {
    27                 if (findSendablePacket(outgoingQueue,
    28                         cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
    29                     enableWrite();
    30                 }
    31             }
    32         }
    33         selected.clear();
    34     }



     1 void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
     2       throws InterruptedException, IOException {
     3         SocketChannel sock = (SocketChannel) sockKey.channel();
     4         if (sock == null) {
     5             throw new IOException("Socket is null!");
     6         }
     7         if (sockKey.isReadable()) {
     8             int rc = sock.read(incomingBuffer);
     9             if (rc < 0) {
    10                 throw new EndOfStreamException(
    11                         "Unable to read additional data from server sessionid 0x"
    12                                 + Long.toHexString(sessionId)
    13                                 + ", likely server has closed socket");
    14             }
    15             if (!incomingBuffer.hasRemaining()) {
    16                 incomingBuffer.flip();
    17                 if (incomingBuffer == lenBuffer) {
    18                     recvCount++;
    19                     readLength();
    20                 } else if (!initialized) {
    21                     readConnectResult();
    22                     enableRead();
    23                     if (findSendablePacket(outgoingQueue,
    24                             cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
    25                         // Since SASL authentication has completed (if client is configured to do so),
    26                         // outgoing packets waiting in the outgoingQueue can now be sent.
    27                         enableWrite();
    28                     }
    29                     lenBuffer.clear();
    30                     incomingBuffer = lenBuffer;
    31                     updateLastHeard();
    32                     initialized = true;
    33                 } else {
    34                     sendThread.readResponse(incomingBuffer);
    35                     lenBuffer.clear();
    36                     incomingBuffer = lenBuffer;
    37                     updateLastHeard();
    38                 }
    39             }
    40         }
    41         if (sockKey.isWritable()) {
    42             synchronized(outgoingQueue) {
    43                 Packet p = findSendablePacket(outgoingQueue,
    44                         cnxn.sendThread.clientTunneledAuthenticationInProgress());
    45 
    46                 if (p != null) {
    47                     updateLastSend();
    48                     // If we already started writing p, p.bb will already exist
    49                     if (p.bb == null) {
    50                         if ((p.requestHeader != null) &&
    51                                 (p.requestHeader.getType() != OpCode.ping) &&
    52                                 (p.requestHeader.getType() != OpCode.auth)) {
    53                             p.requestHeader.setXid(cnxn.getXid());
    54                         }
    55                         p.createBB();
    56                     }
    57                     sock.write(p.bb);
    58                     if (!p.bb.hasRemaining()) {
    59                         sentCount++;
    60                         outgoingQueue.removeFirstOccurrence(p);
    61                         if (p.requestHeader != null
    62                                 && p.requestHeader.getType() != OpCode.ping
    63                                 && p.requestHeader.getType() != OpCode.auth) {
    64                             synchronized (pendingQueue) {
    65                                 pendingQueue.add(p);
    66                             }
    67                         }
    68                     }
    69                 }
    70                 if (outgoingQueue.isEmpty()) {
    71                     // No more packets to send: turn off write interest flag.
    72                     // Will be turned on later by a later call to enableWrite(),
    73                     // from within ZooKeeperSaslClient (if client is configured
    74                     // to attempt SASL authentication), or in either doIO() or
    75                     // in doTransport() if not.
    76                     disableWrite();
    77                 } else if (!initialized && p != null && !p.bb.hasRemaining()) {
    78                     // On initial connection, write the complete connect request
    79                     // packet, but then disable further writes until after
    80                     // receiving a successful connection response.  If the
    81                     // session is expired, then the server sends the expiration
    82                     // response and immediately closes its end of the socket.  If
    83                     // the client is simultaneously writing on its end, then the
    84                     // TCP stack may choose to abort with RST, in which case the
    85                     // client would never receive the session expired event.  See
    86                     // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
    87                     disableWrite();
    88                 } else {
    89                     // Just in case
    90                     enableWrite();
    91                 }
    92             }
    93         }
    94     }



    SendThread.class

      1 void readResponse(ByteBuffer incomingBuffer) throws IOException {
      2             ByteBufferInputStream bbis = new ByteBufferInputStream(
      3                     incomingBuffer);
      4             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
      5             ReplyHeader replyHdr = new ReplyHeader();
      6 
      7             replyHdr.deserialize(bbia, "header");
      8             if (replyHdr.getXid() == -2) {
      9                 // -2 is the xid for pings
     10                 if (LOG.isDebugEnabled()) {
     11                     LOG.debug("Got ping response for sessionid: 0x"
     12                             + Long.toHexString(sessionId)
     13                             + " after "
     14                             + ((System.nanoTime() - lastPingSentNs) / 1000000)
     15                             + "ms");
     16                 }
     17                 return;
     18             }
     19             if (replyHdr.getXid() == -4) {
     20                 // -4 is the xid for AuthPacket               
     21                 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
     22                     state = States.AUTH_FAILED;                    
     23                     eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
     24                             Watcher.Event.KeeperState.AuthFailed, null) );                                        
     25                 }
     26                 if (LOG.isDebugEnabled()) {
     27                     LOG.debug("Got auth sessionid:0x"
     28                             + Long.toHexString(sessionId));
     29                 }
     30                 return;
     31             }
     32             if (replyHdr.getXid() == -1) {
     33                 // -1 means notification
     34                 if (LOG.isDebugEnabled()) {
     35                     LOG.debug("Got notification sessionid:0x"
     36                         + Long.toHexString(sessionId));
     37                 }
     38                 WatcherEvent event = new WatcherEvent();
     39                 event.deserialize(bbia, "response");
     40 
     41                 // convert from a server path to a client path
     42                 if (chrootPath != null) {
     43                     String serverPath = event.getPath();
     44                     if(serverPath.compareTo(chrootPath)==0)
     45                         event.setPath("/");
     46                     else if (serverPath.length() > chrootPath.length())
     47                         event.setPath(serverPath.substring(chrootPath.length()));
     48                     else {
     49                         LOG.warn("Got server path " + event.getPath()
     50                                 + " which is too short for chroot path "
     51                                 + chrootPath);
     52                     }
     53                 }
     54 
     55                 WatchedEvent we = new WatchedEvent(event);
     56                 if (LOG.isDebugEnabled()) {
     57                     LOG.debug("Got " + we + " for sessionid 0x"
     58                             + Long.toHexString(sessionId));
     59                 }
     60 
     61                 eventThread.queueEvent( we );
     62                 return;
     63             }
     64 
     65             // If SASL authentication is currently in progress, construct and
     66             // send a response packet immediately, rather than queuing a
     67             // response as with other packets.
     68             if (clientTunneledAuthenticationInProgress()) {
     69                 GetSASLRequest request = new GetSASLRequest();
     70                 request.deserialize(bbia,"token");
     71                 zooKeeperSaslClient.respondToServer(request.getToken(),
     72                   ClientCnxn.this);
     73                 return;
     74             }
     75 
     76             Packet packet;
     77             synchronized (pendingQueue) {
     78                 if (pendingQueue.size() == 0) {
     79                     throw new IOException("Nothing in the queue, but got "
     80                             + replyHdr.getXid());
     81                 }
     82                 packet = pendingQueue.remove();
     83             }
     84             /*
     85              * Since requests are processed in order, we better get a response
     86              * to the first request!
     87              */
     88             try {
     89                 if (packet.requestHeader.getXid() != replyHdr.getXid()) {
     90                     packet.replyHeader.setErr(
     91                             KeeperException.Code.CONNECTIONLOSS.intValue());
     92                     throw new IOException("Xid out of order. Got Xid "
     93                             + replyHdr.getXid() + " with err " +
     94                             + replyHdr.getErr() +
     95                             " expected Xid "
     96                             + packet.requestHeader.getXid()
     97                             + " for a packet with details: "
     98                             + packet );
     99                 }
    100 
    101                 packet.replyHeader.setXid(replyHdr.getXid());
    102                 packet.replyHeader.setErr(replyHdr.getErr());
    103                 packet.replyHeader.setZxid(replyHdr.getZxid());
    104                 if (replyHdr.getZxid() > 0) {
    105                     lastZxid = replyHdr.getZxid();
    106                 }
    107                 if (packet.response != null && replyHdr.getErr() == 0) {
    108                     packet.response.deserialize(bbia, "response");
    109                 }
    110 
    111                 if (LOG.isDebugEnabled()) {
    112                     LOG.debug("Reading reply sessionid:0x"
    113                             + Long.toHexString(sessionId) + ", packet:: " + packet);
    114                 }
    115             } finally {
    116                 finishPacket(packet);
    117             }
    118         }


    posted on 2016-12-27 13:51 jinfeng_wang 閱讀(504) 評論(0)  編輯  收藏 所屬分類: 2016-zookeeper
    主站蜘蛛池模板: 免费网站观看WWW在线观看| 日本高清免费不卡在线| mm1313亚洲国产精品无码试看| 国产亚洲成归v人片在线观看 | 久久国产色AV免费观看| 全部一级一级毛片免费看| 亚洲综合色丁香婷婷六月图片| 亚洲av无码一区二区三区乱子伦| 亚洲AV无码一区二三区| 在线成人a毛片免费播放| 91九色老熟女免费资源站| 中文无码成人免费视频在线观看| 无套内射无矿码免费看黄| 亚洲乱色伦图片区小说| 97se亚洲国产综合自在线| 亚洲美女激情视频| 亚洲精品人成在线观看| 国产成人无码综合亚洲日韩| 亚洲中文字幕视频国产| 亚洲AV伊人久久青青草原| 免费中文字幕在线| 免费真实播放国产乱子伦| 韩国免费三片在线视频| 夫妻免费无码V看片| 成人免费无码大片A毛片抽搐色欲| 亚洲精品视频在线观看免费| 日韩精品极品视频在线观看免费| 精品四虎免费观看国产高清午夜| 在线免费观看h片| 精品国产一区二区三区免费| 光棍天堂免费手机观看在线观看| 最新亚洲成av人免费看| 免费无码一区二区三区蜜桃| 99久久免费国产精品热| 国产永久免费高清在线| 国产精成人品日日拍夜夜免费 | 亚洲日韩中文无码久久| 国产亚洲成AV人片在线观黄桃 | 久久国产精品一区免费下载| 先锋影音资源片午夜在线观看视频免费播放| 中文在线免费不卡视频|