<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    E81086713E446D36F62B2AA2A3502B5EB155

    Java雜家

    雜七雜八。。。一家之言

    BlogJava 首頁 新隨筆 聯系 聚合 管理
      40 Posts :: 1 Stories :: 174 Comments :: 0 Trackbacks
    前一篇博客,我簡單提了下怎么為NIO2增加TransmitFile支持,文件傳送吞吐量是一個性能關注點,此外,并發連接數也是重要的關注點。

    不過JDK7中又一次做了簡單的實現,不支持同時投遞多個AcceptEx請求,只支持一次一個,返回后再投遞。這樣,客戶端連接的接受速度必然大打折扣。不知道為什么sun會做這樣的實現,WSASend()/WSAReceive()一次只允許一個還是可以理解,畢竟簡化了編程,不用考慮封包亂序問題。
    也降低了內存耗盡的風險。AcceptEx卻沒有這樣的理由了。

    于是再一次為了性能,我增加了同時投遞多個的支持。

    另外,在JDK7的默認實現中,AcceptEx返回后,為了設置遠程和本地InetSocketAddress也采用了效率很低的方法。4次通過JNI調用getsockname,2次為了取sockaddr,2次為了取port. 這些操作本人采用GetAcceptExSockaddrs一次完成,進一步提高效率。


    先看Java部分的代碼,框架跟JDK7的一樣,細節處理不一樣:

    /**
     * 
     
    */
    package sun.nio.ch;

    import java.io.IOException;
    import java.lang.reflect.Field;
    import java.lang.reflect.Method;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.channels.AcceptPendingException;
    import java.nio.channels.AsynchronousCloseException;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.CompletionHandler;
    import java.nio.channels.NotYetBoundException;
    import java.nio.channels.ShutdownChannelGroupException;
    import java.security.AccessControlContext;
    import java.security.AccessController;
    import java.security.PrivilegedAction;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.Future;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;

    import sun.misc.Unsafe;

    /**
     * This class enable multiple 'AcceptEx' post on the completion port, hence improve the concurrent connection number.
     * 
    @author Yvon
     *
     
    */
    public class WindowsMultiAcceptSupport {

        WindowsAsynchronousServerSocketChannelImpl schannel;

        
    private static final Unsafe unsafe = Unsafe.getUnsafe();

        
    // 2 * (sizeof(SOCKET_ADDRESS) + 16)
        private static final int ONE_DATA_BUFFER_SIZE = 88;

        
    private long handle;
        
    private Iocp iocp;

        
    // typically there will be zero, or one I/O operations pending. In rare
        
    // cases there may be more. These rare cases arise when a sequence of accept
        
    // operations complete immediately and handled by the initiating thread.
        
    // The corresponding OVERLAPPED cannot be reused/released until the completion
        
    // event has been posted.
        private PendingIoCache ioCache;

        
    private Queue<Long> dataBuffers;
        
    // the data buffer to receive the local/remote socket address
        
    //        private final long dataBuffer;

        
    private AtomicInteger pendingAccept;
        
    private int maxPending;

        Method updateAcceptContextM;
        Method acceptM;

        WindowsMultiAcceptSupport() {
            
    //dummy for JNI code.
        }

        
    public void close() throws IOException {

            schannel.close();

            
    for (int i = 0; i < maxPending + 1; i++)//assert there is maxPending+1 buffer in the queue
            {
                
    long addr = dataBuffers.poll();
                
    // release  resources
                unsafe.freeMemory(addr);
            }

        }

        
    /**
         * 
         
    */
        
    public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {
            
    if (maxPost <= 0 || maxPost > 1024)
                
    throw new IllegalStateException("maxPost can't less than 1 and greater than 1024");
            
    this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;
            maxPending 
    = maxPost;
            dataBuffers 
    = new ConcurrentLinkedQueue<Long>();
            
    for (int i = 0; i < maxPending + 1; i++) {
                dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
            }

            pendingAccept 
    = new AtomicInteger(0);
            
    try {
                Field f 
    = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
                f.setAccessible(
    true);
                handle 
    = f.getLong(schannel);


                f 
    = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
                f.setAccessible(
    true);
                iocp 
    = (Iocp) f.get(schannel);

                f 
    = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
                f.setAccessible(
    true);
                ioCache 
    = (PendingIoCache) f.get(schannel);

                f 
    = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
                f.setAccessible(
    true);
                AtomicBoolean accepting 
    = (AtomicBoolean) f.get(schannel);

                accepting.set(
    true);//disable accepting by origin channel.

            } 
    catch (Exception e) {
                e.printStackTrace();
            }

        }

        @SuppressWarnings(
    "unchecked")
        
    public final <A> void accept(A attachment,
            CompletionHandler
    <AsynchronousSocketChannel, ? super A> handler) {
            
    if (handler == null)
                
    throw new NullPointerException("'handler' is null");
            implAccept(attachment, (CompletionHandler
    <AsynchronousSocketChannel, Object>) handler);
        }

        
    /**
         * Task to initiate accept operation and to handle result.
         
    */
        
    private class AcceptTask implements Runnable, Iocp.ResultHandler {

            
    private final WindowsAsynchronousSocketChannelImpl channel;
            
    private final AccessControlContext acc;
            
    private final PendingFuture<AsynchronousSocketChannel, Object> result;
            
    private final long dataBuffer;

            AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,
                
    long dataBuffer, PendingFuture<AsynchronousSocketChannel, Object> result) {
                
    this.channel = channel;
                
    this.acc = acc;
                
    this.result = result;
                
    this.dataBuffer = dataBuffer;
            }

            
    void enableAccept() {
                pendingAccept.decrementAndGet();
                dataBuffers.add(dataBuffer);
            }

            
    void closeChildChannel() {
                
    try {
                    channel.close();
                } 
    catch (IOException ignore) {
                }
            }

            
    // caller must have acquired read lock for the listener and child channel.
            void finishAccept() throws IOException {
                
    /**
                 * JDK7 use 4 calls to getsockname  to setup
                 * local& remote address, this is very inefficient.
                 * 
                 * I change this to use GetAcceptExSockaddrs
                 
    */

                InetAddress[] socks 
    = new InetAddress[2];
                
    int[] ports = new int[2];
                updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);
                InetSocketAddress local 
    = new InetSocketAddress(socks[0], ports[0]);
                
    final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);
                channel.setConnected(local, remote);

                
    // permission check (in context of initiating thread)
                if (acc != null) {
                    AccessController.doPrivileged(
    new PrivilegedAction<Void>() {

                        
    public Void run() {
                            SecurityManager sm 
    = System.getSecurityManager();
                            sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());

                            
    return null;
                        }
                    }, acc);
                }
            }

            
    /**
             * Initiates the accept operation.
             
    */
            @Override
            
    public void run() {
                
    long overlapped = 0L;

                
    try {
                    
    // begin usage of listener socket
                    schannel.begin();
                    
    try {
                        
    // begin usage of child socket (as it is registered with
                        
    // completion port and so may be closed in the event that
                        
    // the group is forcefully closed).
                        channel.begin();

                        
    synchronized (result) {
                            overlapped 
    = ioCache.add(result);

                          
                            
    int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address
                            if (n == IOStatus.UNAVAILABLE) {
                                
    return;
                            }

                            
    // connection accepted immediately
                            finishAccept();

                            
    // allow another accept before the result is set
                            enableAccept();
                            result.setResult(channel);
                        }
                    } 
    finally {
                        
    // end usage on child socket
                        channel.end();
                    }
                } 
    catch (Throwable x) {
                    
    // failed to initiate accept so release resources
                    if (overlapped != 0L)
                        ioCache.remove(overlapped);
                    closeChildChannel();
                    
    if (x instanceof ClosedChannelException)
                        x 
    = new AsynchronousCloseException();
                    
    if (!(x instanceof IOException) && !(x instanceof SecurityException))
                        x 
    = new IOException(x);
                    enableAccept();
                    result.setFailure(x);
                } 
    finally {
                    
    // end of usage of listener socket
                    schannel.end();
                }

                
    // accept completed immediately but may not have executed on
                
    // initiating thread in which case the operation may have been
                
    // cancelled.
                if (result.isCancelled()) {
                    closeChildChannel();
                }

                
    // invoke completion handler
                Invoker.invokeIndirectly(result);
            }

            
    /**
             * Executed when the I/O has completed
             
    */
            @Override
            
    public void completed(int bytesTransferred, boolean canInvokeDirect) {
                
    try {
                    
    // connection accept after group has shutdown
                    if (iocp.isShutdown()) {
                        
    throw new IOException(new ShutdownChannelGroupException());
                    }

                    
    // finish the accept
                    try {
                        schannel.begin();
                        
    try {
                            channel.begin();
                            finishAccept();
                        } 
    finally {
                            channel.end();
                        }
                    } 
    finally {
                        schannel.end();
                    }

                    
    // allow another accept before the result is set
                    enableAccept();
                    result.setResult(channel);
                } 
    catch (Throwable x) {
                    enableAccept();
                    closeChildChannel();
                    
    if (x instanceof ClosedChannelException)
                        x 
    = new AsynchronousCloseException();
                    
    if (!(x instanceof IOException) && !(x instanceof SecurityException))
                        x 
    = new IOException(x);
                    result.setFailure(x);
                }

                
    // if an async cancel has already cancelled the operation then
                
    // close the new channel so as to free resources
                if (result.isCancelled()) {
                    closeChildChannel();
                }

                
    // invoke handler (but not directly)
                Invoker.invokeIndirectly(result);
            }

            @Override
            
    public void failed(int error, IOException x) {
                enableAccept();
                closeChildChannel();

                
    // release waiters
                if (schannel.isOpen()) {
                    result.setFailure(x);
                } 
    else {
                    result.setFailure(
    new AsynchronousCloseException());
                }
                Invoker.invokeIndirectly(result);
            }
        }

        Future
    <AsynchronousSocketChannel> implAccept(Object attachment,
            
    final CompletionHandler<AsynchronousSocketChannel, Object> handler) {
            
    if (!schannel.isOpen()) {
                Throwable exc 
    = new ClosedChannelException();
                
    if (handler == null)
                    
    return CompletedFuture.withFailure(exc);
                Invoker.invokeIndirectly(schannel, handler, attachment, 
    null, exc);
                
    return null;
            }
            
    if (schannel.isAcceptKilled())
                
    throw new RuntimeException("Accept not allowed due to cancellation");

            
    // ensure channel is bound to local address
            if (schannel.localAddress == null)
                
    throw new NotYetBoundException();

            
    // create the socket that will be accepted. The creation of the socket
            
    // is enclosed by a begin/end for the listener socket to ensure that
            
    // we check that the listener is open and also to prevent the I/O
            
    // port from being closed as the new socket is registered.
            WindowsAsynchronousSocketChannelImpl ch = null;
            IOException ioe 
    = null;
            
    try {
                schannel.begin();
                ch 
    = new WindowsAsynchronousSocketChannelImpl(iocp, false);
            } 
    catch (IOException x) {
                ioe 
    = x;
            } 
    finally {
                schannel.end();
            }
            
    if (ioe != null) {
                
    if (handler == null)
                    
    return CompletedFuture.withFailure(ioe);
                Invoker.invokeIndirectly(
    this.schannel, handler, attachment, null, ioe);
                
    return null;
            }

            
    // need calling context when there is security manager as
            
    // permission check may be done in a different thread without
            
    // any application call frames on the stack
            AccessControlContext acc =
                (System.getSecurityManager() 
    == null? null : AccessController.getContext();

            PendingFuture
    <AsynchronousSocketChannel, Object> result =
                
    new PendingFuture<AsynchronousSocketChannel, Object>(schannel, handler, attachment);

            
    // check and set flag to prevent concurrent accepting
            if (pendingAccept.get() >= maxPending)
                
    throw new AcceptPendingException();
            pendingAccept.incrementAndGet();
            AcceptTask task 
    = new AcceptTask(ch, acc, dataBuffers.poll(), result);
            result.setContext(task);

            
    // initiate I/O
            if (Iocp.supportsThreadAgnosticIo()) {
                task.run();
            } 
    else {
                Invoker.invokeOnThreadInThreadPool(
    this.schannel, task);
            }
            
    return result;
        }

        
    //    //reimplements for performance
        static native void updateAcceptContext(long listenSocket, long acceptSocket,
            InetAddress[] addresses, 
    int[] ports, long dataBuffer) throws IOException;

        
    static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);

    }


    對應的CPP代碼如下:


    /*
     * Class:     sun_nio_ch_WindowsMultiAcceptSupport
     * Method:    updateAcceptContext
     * Signature: (JJ[Ljava/net/InetAddress;[IJ)V
     
    */
    JNIEXPORT 
    void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
    (JNIEnv 
    *env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)
    {
        SOCKET s1 
    = (SOCKET)jlong_to_ptr(listenSocket);
        SOCKET s2 
    = (SOCKET)jlong_to_ptr(acceptSocket);
        PVOID outputBuffer 
    = (PVOID)jlong_to_ptr(buf);
        INT iLocalAddrLen
    =0;
        INT iRemoteAddrLen
    =0;
        SOCKETADDRESS
    * lpLocalAddr;
        SOCKETADDRESS
    * lpRemoteAddr;
        jobject localAddr;
        jobject remoteAddr;
        jint ports[
    2]={0};

        

        setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (
    char *)&s1, sizeof(s1));

        (lpGetAcceptExSockaddrs)(outputBuffer,
            
    0,
            
    sizeof(SOCKETADDRESS)+16,
            
    sizeof(SOCKETADDRESS)+16,
            (LPSOCKADDR
    *)&lpLocalAddr,
            
    &iLocalAddrLen,
            (LPSOCKADDR
    *)&lpRemoteAddr,
            
    &iRemoteAddrLen);

        localAddr
    =lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);
        remoteAddr
    =lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));

        env
    ->SetObjectArrayElement(sockArray,0,localAddr);
        env
    ->SetObjectArrayElement(sockArray,1,remoteAddr);
        env
    ->SetIntArrayRegion(portArray,0,2,ports);

    }

    /*
     * Class:     sun_nio_ch_WindowsMultiAcceptSupport
     * Method:    accept0
     * Signature: (JJJJ)I
     
    */
    jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
      (JNIEnv 
    *env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)
    {

        BOOL res;
        SOCKET s1 
    = (SOCKET)jlong_to_ptr(listenSocket);
        SOCKET s2 
    = (SOCKET)jlong_to_ptr(acceptSocket);
        PVOID outputBuffer 
    = (PVOID)jlong_to_ptr(buf);

        DWORD nread 
    = 0;
        OVERLAPPED
    * lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);
        ZeroMemory((PVOID)lpOverlapped, 
    sizeof(OVERLAPPED));

        

        
    //why use SOCKETADDRESS?
        
    //because client may use IPv6 to connect to server.
        res = (lpAcceptEx)(s1,
            s2,
            outputBuffer,
            
    0,
            
    sizeof(SOCKETADDRESS)+16,
            
    sizeof(SOCKETADDRESS)+16,
            
    &nread,
            lpOverlapped);

        
        
    if (res == 0) {
            
    int error = WSAGetLastError();
            
            
    if (error == ERROR_IO_PENDING) {
                
                
    return NIO2_IOS_UNAVAILABLE;
            }
        
        
            
    return NIO2_THROWN;
        }



        
        
    return 0;

    }

    這里用到的lpNET_SockaddrToInetAddress是JDK7中NET.DLL暴露的方法,從DLL里加載。相應代碼如下:

    *
     
    * Class:     com_yovn_jabhttpd_utilities_SunPackageFixer
     
    * Method:    initFds
     
    * Signature: ()V
     
    */
    JNIEXPORT 
    void JNICALL Java_com_yovn_jabhttpd_utilities_SunPackageFixer_initFds
      (JNIEnv 
    *env, jclass clazz)
    {


        GUID GuidAcceptEx 
    = WSAID_ACCEPTEX;
        GUID GuidTransmitFile 
    = WSAID_TRANSMITFILE;
        GUID GuidGetAcceptExSockAddrs 
    = WSAID_GETACCEPTEXSOCKADDRS;
        SOCKET s;
        
    int rv;
        DWORD dwBytes;
        HMODULE hModule;


        s 
    = socket(AF_INET, SOCK_STREAM, 0);
        
    if (s == INVALID_SOCKET) {
            JNU_ThrowByName(env,
    "java/io/IOException""socket failed");
            
    return;
        }
        rv 
    = WSAIoctl(s,
            SIO_GET_EXTENSION_FUNCTION_POINTER,
            (LPVOID)
    &GuidAcceptEx,
            
    sizeof(GuidAcceptEx),
            
    &lpAcceptEx,
            
    sizeof(lpAcceptEx),
            
    &dwBytes,
            NULL,
            NULL);
        
    if (rv != 0)
        {
            JNU_ThrowByName(env, 
    "java/io/IOException","WSAIoctl failed on get AcceptEx ");
            
    goto _ret;
        }
        rv 
    = WSAIoctl(s,
            SIO_GET_EXTENSION_FUNCTION_POINTER,
            (LPVOID)
    &GuidTransmitFile,
            
    sizeof(GuidTransmitFile),
            
    &lpTransmitFile,
            
    sizeof(lpTransmitFile),
            
    &dwBytes,
            NULL,
            NULL);
        
    if (rv != 0)
        {
            JNU_ThrowByName(env, 
    "java/io/IOException","WSAIoctl failed on get TransmitFile");
            
    goto _ret;
        }
        rv 
    = WSAIoctl(s,
            SIO_GET_EXTENSION_FUNCTION_POINTER,
            (LPVOID)
    &GuidGetAcceptExSockAddrs,
            
    sizeof(GuidGetAcceptExSockAddrs),
            
    &lpGetAcceptExSockaddrs,
            
    sizeof(lpGetAcceptExSockaddrs),
            
    &dwBytes,
            NULL,
            NULL);
        
    if (rv != 0)
        {
            JNU_ThrowByName(env, 
    "java/io/IOException","WSAIoctl failed on get GetAcceptExSockaddrs");
            
    goto _ret;
        }

        hModule
    =LoadLibrary("net.dll");
        
    if(hModule==NULL)
        {
            JNU_ThrowByName(env, 
    "java/io/IOException","can't load java net.dll");
            
    goto _ret;
        }


        lpNET_SockaddrToInetAddress
    =(NET_SockaddrToInetAddress_t)GetProcAddress(hModule,"_NET_SockaddrToInetAddress@12");

        
    if(lpNET_SockaddrToInetAddress==NULL)
        {
            JNU_ThrowByName(env, 
    "java/io/IOException","can't resolve _NET_SockaddrToInetAddress function ");
            
            
        }

    _ret:
        closesocket(s);
        
    return;


    }

    細心的同學可能會發現,在創建socket之前沒有初始化WinSock庫,因為在這段代碼前,我初始化了一個InetSocketAddress對象,這樣JVM會加載NET.DLL并初始化WinSock庫了。

    OK,現在,你可以在支持類上同時發起多個AcceptEx請求了。

    PS:基于這個我簡單測試了下我的服務器,同時開5000個線程,每個下載3M多點的文件,一分鐘內能夠全部正確完成。
    服務器正在開發中,有興趣的請加入:http://code.google.com/p/jabhttpd


    posted on 2009-12-04 17:57 DoubleH 閱讀(3898) 評論(6)  編輯  收藏

    Feedback

    # re: 基于JDK7 NIO2的高性能web服務器實踐之二[未登錄] 2011-04-23 20:54 java
    很想看看牛人的實現,但報403,說我沒有權限,可否傳一份給我.郵箱wahel30615571@hotmail.com  回復  更多評論
      

    # re: 基于JDK7 NIO2的高性能web服務器實踐之二[未登錄] 2011-11-16 16:37 隨意
    哥們,代碼能發到zhouchn8@163.com一份嗎,謝謝  回復  更多評論
      

    # re: 基于JDK7 NIO2的高性能web服務器實踐之二[未登錄] 2011-12-19 10:47 VV
    老師 代碼給一份吧,,,, vanlin (AT) 139 dot com  回復  更多評論
      

    # re: 基于JDK7 NIO2的高性能web服務器實踐之二 2012-02-05 14:39 夢想在飛
    兄弟:給我一份,
    544286609@qq.com 謝謝  回復  更多評論
      

    # re: 基于JDK7 NIO2的高性能web服務器實踐之二 2012-03-12 21:14 為夢想奔跑
    老師,能否發份代碼asshp1790@126.com,謝謝  回復  更多評論
      

    # re: 基于JDK7 NIO2的高性能web服務器實踐之二 2012-05-08 17:05 xingye
    老師,你好,可以給我一份代碼嗎?liangxingye@163.com,謝謝  回復  更多評論
      


    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 亚洲AV日韩AV永久无码绿巨人| 精品特级一级毛片免费观看| 午夜高清免费在线观看| 特级毛片免费播放| 久久久久亚洲精品影视| 91嫩草国产在线观看免费| 有色视频在线观看免费高清在线直播| 亚洲精品乱码久久久久久| 16女性下面扒开无遮挡免费| 国产亚洲一卡2卡3卡4卡新区 | 亚洲人成无码网WWW| 免费无码成人AV在线播放不卡| 亚洲国产成人99精品激情在线| 亚洲不卡AV影片在线播放| 日韩免费的视频在线观看香蕉| 中国china体内裑精亚洲日本| 亚洲色偷偷综合亚洲AV伊人| 久草免费在线观看视频| 视频免费1区二区三区| 色五月五月丁香亚洲综合网| 无码精品人妻一区二区三区免费| 久久久亚洲欧洲日产国码农村| 天天干在线免费视频| 在线看片免费人成视频福利| 亚洲中文字幕无码中文字| 亚洲日韩激情无码一区| 青青草国产免费久久久下载| 久久99青青精品免费观看| 羞羞视频在线免费观看| 亚洲一区二区三区国产精品无码| 亚洲乱码国产乱码精品精| 成人免费无码精品国产电影| 在线免费中文字幕| 中文字幕一区二区三区免费视频| 亚洲人成电影网站色www| 久久精品九九亚洲精品| 亚洲中文字幕在线乱码| 免费一级特黄特色大片在线| 最近中文字幕无免费视频| 美丽姑娘免费观看在线观看中文版| 亚洲精品视频免费|