<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Dict.CN 在線詞典, 英語學習, 在線翻譯

    都市淘沙者

    荔枝FM Everyone can be host

    統計

    留言簿(23)

    積分與排名

    優秀學習網站

    友情連接

    閱讀排行榜

    評論排行榜

    用NIO實現的一個Chat Demo [轉]

    發現網上找到的許多NIO的用例在跑起來后都有許多問題,最常見的就是沒有對interest event進行合理的registry和unregistry,導致程序一直在loopling,又或者當客戶端或服務器端連接斷開時有死循環的跡象。忍不住自己做了一個demo,我想可以作為一個NIO應用的模板去修改,只要把doRead,doWrite之類的用基于線程的Handler去處理,那就基本可以滿足需求了。
    這個Demo的目的是在Client和Server間建立類似QQ聊天那樣的功能,讓客戶端和服務器端都支持用戶輸入和異步消息顯示(因為服務器端要支持用戶的console輸入,所以不要用多個客戶端進行連接,否則可能會出現難以預測的問題)。
    代碼中用紅色顯示的地方是我認為需要注意的地方,說老實話NIO雖然很強大,但完全用Non-Blocking來編程,有許多需要小心的地方,一不小心還可能造成死循環。就像ReentrantLock之于Synchronized,如果基本的IO能滿足需求,就不必強求應用NIO。
    注意:OP_WRITE應該是在寫入準備就緒的時候才添加到SelectionKey里面去,而且在寫入完成后一定要去除,否則selector.select()方法就不會被blocking而造成死循環。

    MyNioServer.java

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Set;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.Selector;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.SocketChannel;

    public class MyNioServer {

        private int BUFFERSIZE = 1024*10;
        private String CHARSET = "GBK";
        private Selector sel;

        public MyNioServer(int port) throws IOException {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.socket().bind(
                    new InetSocketAddress(InetAddress.getLocalHost(), port));
            sel = Selector.open();
            ssc.register(sel, SelectionKey.OP_ACCEPT);
        }

        public void startup() {
            System.out.println("Server start...");
            try {
                while (!Thread.interrupted()) {
                    int keysCount = sel.select();
                    System.out.println("Catched " + keysCount + " SelectionKeys");
                    if (keysCount < 1) {
                        continue;
                    }
                    Set<SelectionKey> set = sel.selectedKeys();
                    Iterator<SelectionKey> it = set.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        if (key.isAcceptable()) {
                            System.out.println("Key isAcceptable");
                            doAccept(key);
                        }
                        if (key.isValid() && key.isReadable()) {
                            System.out.println("Key isReadable");
                            doRead(key);
                        }
                        if (key.isValid() && key.isWritable()) {
                            System.out.println("Key isWritable");
                            doWrite(key);
                        }
                    }
                    set.clear();
                }
                System.err.println("Program is interrupted.");
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("Server stop...");
            shutdown();
        }
       
        public void shutdown(){
            Set<SelectionKey> keys = sel.keys();
            for(SelectionKey key:keys){
                try {
                    key.channel().close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                sel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void doAccept(SelectionKey key) {
            try {
                SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
                sc.configureBlocking(false);
                SelectionKey newkey = sc.register(sel, SelectionKey.OP_READ);
                newkey.attach(new LinkedList<ByteBuffer>());
                new Thread(new UserInteractive(newkey)).start();
            } catch (IOException e) {
                e.printStackTrace();
                System.err.println("Failed to accept new client.");
            }
            System.out.println("end doAccept");
        }

        // TODO buffersize performance testing
        private void doRead(SelectionKey key) {
            try {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                StringBuffer sb = new StringBuffer();
                int count = 0;
                while ( (count = sc.read(bb)) > 0) {
                    bb.flip();
                    sb.append(Charset.forName(CHARSET).decode(bb));
                    bb.flip();
                }
                //if client disconnected, read return -1
                if(count == -1){
                    System.out.println("client disconnected");
                    disconnect(key);   
                } else {
                    System.out.println("message received from client:" + sb.toString());
                }
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            System.out.println("end doRead");
        }

        private void doWrite(SelectionKey key) {
            SocketChannel sc = (SocketChannel) key.channel();
            LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                    .attachment();
            ByteBuffer bb = outseq.poll();
            if(bb == null){
                return;
            }
            try {
                while(bb.hasRemaining()){
                    sc.write(bb);
                }           
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            if (outseq.size() == 0) {
                System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
                key.interestOps(SelectionKey.OP_READ);
            }
            System.out.println("end doWrote");
        }

        private void disconnect(SelectionKey key) {
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        //TODO find out how to shutdown
        private class UserInteractive implements Runnable {

            SelectionKey key;

            public UserInteractive(SelectionKey key) {
                this.key = key;
            }

            public void run() {
                System.out.println("UserInteractive thread start...");
                BufferedReader br = new BufferedReader(new InputStreamReader(
                        System.in));
                while (true) {
                    try {
                        String inputLine = br.readLine();
                        ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                        bb = ByteBuffer.wrap(inputLine.getBytes());
                        ((LinkedList<ByteBuffer>) key.attachment()).offer(bb);
                        System.out
                                .println("after input, register OP_WRITE to interestOps and wakeup selector");
                        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        key.selector().wakeup();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * @param args
         */
        public static void main(String[] args) {
            try {
                MyNioServer server = new MyNioServer(10001);
                server.startup();
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("Exception caught, program exiting…");
            }
        }
    }


    MyNioClient.java

    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.charset.Charset;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.SelectionKey;
    import java.text.MessageFormat;
    import java.util.LinkedList;
    import java.util.Set;
    import java.util.Iterator;

    public class MyNioClient {

        private int BUFFERSIZE = 1024*10;
        private String CHARSET = "GBK";
        private Selector sel;

        public MyNioClient(int port) throws IOException {
            SocketChannel sc = SocketChannel.open();
            sc.configureBlocking(false);    // this operation need to be executed before socket.connnect, for OP_CONNECT event
            sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
            sel = Selector.open();
            sc.register(sel, SelectionKey.OP_CONNECT |SelectionKey.OP_READ);
        }

        public void startup() {
            System.out.println("Client start...");
            try {
                while (!Thread.interrupted()) {
                    int keysCount = sel.select();
                    System.out.println("Catched " + keysCount + " SelectionKeys");
                    if (keysCount < 1) {
                        continue;
                    }               
                    Set<SelectionKey> selectedKeys = sel.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        //printKeyInfo(key);
                        if (key.isConnectable()) {
                            System.out.println("Key isConnectable");
                            doConnect(key);
                        } else if (key.isValid() && key.isReadable()) {
                            System.out.println("Key isReadable");
                            doRead(key);
                        } else if (key.isValid() && key.isWritable()) {
                            System.out.println("Key isWritable");
                            doWrite(key);
                        }
                    }
                    selectedKeys.clear();
                }
                System.err.println("Program is interrupted.");
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("Client stop...");
            shutdown();
        }
       
        public void shutdown(){
            Set<SelectionKey> keys = sel.keys();
            for(SelectionKey key:keys){
                try {
                    key.channel().close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            try {
                sel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private void printKeyInfo(SelectionKey key) {
            String keyStr = MessageFormat
                    .format(
                            "IntOps:{0},ReadyOps:{1},isVal:{2},isAcc:{3},isCnn:{4},isRead:{5},isWrite:{6}",
                            key.interestOps(), key.readyOps(), key.isValid(), key
                                    .isAcceptable(), key.isConnectable(), key
                                    .isReadable(), key.isWritable());
            System.out.println(keyStr);
        }

        private void doConnect(SelectionKey key) {
            try {
                boolean flag = ((SocketChannel) key.channel()).finishConnect();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
            System.out.println("unregister OP_CONNECT from interestOps");
            key.interestOps(SelectionKey.OP_READ);
            key.attach(new LinkedList<ByteBuffer>());
            new Thread(new UserInteractive(key)).start();
        }

        private void doRead(SelectionKey key) {
            try {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                StringBuffer sb = new StringBuffer();
                while (sc.read(bb) > 0) {
                    bb.flip();
                    sb.append(Charset.forName(CHARSET).decode(bb));
                    bb.flip();
                }
                System.out.println("message received from server:" + sb.toString());
            } catch (IOException e) {
                e.printStackTrace();
                disconnect(key);
                System.exit(1);
            }
            System.out.println("now end readMessage");
        }

        private void doWrite(SelectionKey key) {
            SocketChannel sc = (SocketChannel) key.channel();
            LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                    .attachment();
            ByteBuffer bb = outseq.poll();
            if(bb == null){
                return;
            }
            try {
                while(bb.hasRemaining()){
                    sc.write(bb);
                }           
            } catch (IOException e) {
                disconnect(key);
                e.printStackTrace();
            }
            if (outseq.size() == 0) {
                System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
                key.interestOps(SelectionKey.OP_READ);
            }
            System.out.println("end doWrote");
        }

        private void disconnect(SelectionKey key) {
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private class UserInteractive implements Runnable {

            SelectionKey key;

            public UserInteractive(SelectionKey key) {
                this.key = key;
            }

            public void run() {
                LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                        .attachment();
                 BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                while (true) {
                    try {
                        String inputLine = br.readLine();
                        if ("quit".equalsIgnoreCase(inputLine)) {
                            key.channel().close();
                            System.exit(1);
                            break;
                        }
                        ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                        bb = ByteBuffer.wrap(inputLine.getBytes());
                        outseq.offer(bb);
                        System.out
                        .println("after input, register OP_WRITE to interestOps and wakeup selector");
                        key.interestOps(SelectionKey.OP_READ
                                | SelectionKey.OP_WRITE);
                        sel.wakeup();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        /**
         * @param args
         */
        public static void main(String[] args) {
            try {
                MyNioClient client = new MyNioClient(10001);
                client.startup();
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("Exception caught, program exiting...");
            }
        }

    }

    posted on 2010-05-29 12:38 都市淘沙者 閱讀(1353) 評論(1)  編輯  收藏 所屬分類: 多線程并發編程

    評論

    # re: 用NIO實現的一個Chat Demo [轉][未登錄] 2015-04-08 23:36 harry

    請教一個問題,為什么UserInteractive里面 SelctionKey.wakeup以后,就成了寫就緒模式呢(key.isWritable()是true)  回復  更多評論   

    主站蜘蛛池模板: 亚洲人成网网址在线看| 亚洲午夜精品一区二区公牛电影院 | 日韩精品无码一区二区三区免费 | 99视频在线精品免费观看6| 中文字幕亚洲综合久久| 久久精品视频免费播放| 久久亚洲精品国产精品| 最近免费中文字幕大全高清大全1 最近免费中文字幕mv在线电影 | h片在线观看免费| 亚洲综合色自拍一区| 黄桃AV无码免费一区二区三区| 亚洲最大AV网站在线观看| 在线视频网址免费播放| 国产V亚洲V天堂A无码| 性xxxxx大片免费视频| 亚洲小说区图片区| 最新中文字幕电影免费观看| 婷婷国产偷v国产偷v亚洲| 亚洲一级特黄大片无码毛片 | 曰曰鲁夜夜免费播放视频 | 亚洲国产日韩在线| 好吊妞在线成人免费| 在线播放免费人成视频网站 | 亚洲区精品久久一区二区三区| 中文字幕无码不卡免费视频| 337P日本欧洲亚洲大胆艺术图| 亚洲福利精品一区二区三区| 国产午夜精品久久久久免费视| 亚洲高清不卡视频| 国产zzjjzzjj视频全免费 | 国产精品永久免费| 亚洲第一香蕉视频| 亚洲Av无码乱码在线znlu| 在线观看特色大片免费网站| 亚洲乱码一区av春药高潮| 国产jizzjizz视频全部免费| 久久国产免费一区| 亚洲a∨无码精品色午夜| 精品亚洲综合在线第一区| A在线观看免费网站大全| 国产精品免费久久久久电影网|