<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Jack Jiang

    我的最新工程MobileIMSDK:http://git.oschina.net/jackjiang/MobileIMSDK
    posts - 494, comments - 13, trackbacks - 0, articles - 1

    本文由竹子愛熊貓分享,原題“(十一)Netty實戰篇:基于Netty框架打造一款高性能的IM即時通訊程序”,本文有修訂和改動。

    1、引言

    關于Netty網絡框架的內容,前面已經講了兩個章節,但總歸來說難以真正掌握,畢竟只是對其中一個個組件進行講解,很難讓諸位將其串起來形成一條線,所以本章中則會結合實戰案例,對Netty進行更深層次的學習與掌握,實戰案例也并不難,一個非常樸素的IM聊天程序。

    原本打算做個多人斗地主練習程序,但那需要織入過多的業務邏輯,因此一方面會帶來不必要的理解難度,讓案例更為復雜化,另一方面代碼量也會偏多,所以最終依舊選擇實現基本的IM聊天程序,既簡單,又能加深對Netty的理解。

    技術交流:

    (本文已同步發布于:http://www.52im.net/thread-4530-1-1.html

    2、配套源碼

    本文配套源碼的開源托管地址是:

    3、知識準備

    關于 Netty 是什么,這里簡單介紹下:

    Netty 是一個 Java 開源框架。Netty 提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。

    也就是說,Netty 是一個基于 NIO 的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。

    Netty 相當簡化和流線化了網絡應用的編程開發過程,例如,TCP 和 UDP 的 Socket 服務開發。

    有關Netty的入門文章:

    如果你連Java NIO都不知道,下面的文章建議優先讀:

    Netty源碼和API 在線查閱地址:

    4、基于Netty設計通信協議

    協議,這玩意兒相信大家肯定不陌生了,簡單回顧一下協議的概念:網絡協議是指一種通信雙方都必須遵守的約定,兩個不同的端,按照一定的格式對數據進行“編碼”,同時按照相同的規則進行“解碼”,從而實現兩者之間的數據傳輸與通信。

    當自己想要打造一款IM通信程序時,對于消息的封裝、拆分也同樣需要設計一個協議,通信的兩端都必須遵守該協議工作,這也是實現通信程序的前提。

    但為什么需要通信協議呢?

    因為TCP/IP中是基于流的方式傳輸消息,消息與消息之間沒有邊界,而協議的目的則在于約定消息的樣式、邊界等。

    5、Redis通信的RESP協議參考學習

    不知大家是否還記得之前我聊到的RESP客戶端協議,這是Redis提供的一種客戶端通信協議。如果想要操作Redis,就必須遵守該協議的格式發送數據。

    這個協議特別簡單,如下:

    • 1)首先要求所有命令,都以*開頭,后面跟著具體的子命令數量,接著用換行符分割;
    • 2)接著需要先用$符號聲明每個子命令的長度,然后再用換行符分割;
    • 3)最后再拼接上具體的子命令,同樣用換行符分割。

    這樣描述有些令人難懂,那就直接看個案例,例如一條簡單set命令。

    如下:

    客戶端命令:

        setname ZhuZi

    轉變為RESP指令:

        *3

        $3

        set

        $4

        name

        $5

        ZhuZi

    按照Redis的規定,但凡滿足RESP協議的客戶端,都可以直接連接并操作Redis服務端,這也就意味著咱們可以直接通過Netty來手寫一個Redis客戶端。

    代碼如下:

    // 基于Netty、RESP協議實現的Redis客戶端

    publicclassRedisClient {

        // 換行符的ASCII碼

        staticfinalbyte[] LINE = {13, 10};

     

        publicstaticvoidmain(String[] args) {

            EventLoopGroup worker = newNioEventLoopGroup();

            Bootstrap client = newBootstrap();

     

            try{

                client.group(worker);

                client.channel(NioSocketChannel.class);

                client.handler(newChannelInitializer<SocketChannel>() {

                    @Override

                    protectedvoidinitChannel(SocketChannel socketChannel)

                                                            throwsException {

                        ChannelPipeline pipeline = socketChannel.pipeline();

     

                        pipeline.addLast(newChannelInboundHandlerAdapter(){

     

                            // 通道建立成功后調用:向Redis發送一條set命令

                            @Override

                            publicvoidchannelActive(ChannelHandlerContext ctx)

                                                                throwsException {

                                String command = "set name ZhuZi";

                                ByteBuf buffer = respCommand(command);

                                ctx.channel().writeAndFlush(buffer);

                            }

     

                            // Redis響應數據時觸發:打印Redis的響應結果

                            @Override

                            publicvoidchannelRead(ChannelHandlerContext ctx,

                                                    Object msg) throwsException {

                                // 接受Redis服務端執行指令后的結果

                                ByteBuf buffer = (ByteBuf) msg;

                                System.out.println(buffer.toString(CharsetUtil.UTF_8));

                            }

                        });

                    }

                });

     

                // 根據IP、端口連接Redis服務端

                client.connect("192.168.12.129", 6379).sync();

            } catch(Exception e){

                e.printStackTrace();

            }

        }

     

        privatestaticByteBuf respCommand(String command){

            // 先對傳入的命令以空格進行分割

            String[] commands = command.split(" ");

            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();

     

            // 遵循RESP協議:先寫入指令的個數

            buffer.writeBytes(("*"+ commands.length).getBytes());

            buffer.writeBytes(LINE);

     

            // 接著分別寫入每個指令的長度以及具體值

            for(String s : commands) {

                buffer.writeBytes(("$"+ s.length()).getBytes());

                buffer.writeBytes(LINE);

                buffer.writeBytes(s.getBytes());

                buffer.writeBytes(LINE);

            }

            // 把轉換成RESP格式的命令返回

            returnbuffer;

        }

    }

    在上述這個案例中,也僅僅只是通過respCommand()這個方法,對用戶輸入的指令進行了轉換。同時在上面通過Netty,與Redis的地址、端口建立了連接。在連接建立成功后,就會向Redis發送一條轉換成RESP指令的set命令。接著等待Redis的響應結果并輸出,如下:

    +OK

    因為這是一條寫指令,所以當Redis收到執行完成后,最終就會返回一個OK,大家也可直接去Redis中查詢,也依舊能夠查詢到剛剛寫入的name這個鍵值。

    6、HTTP超文本傳輸協議參考學習

    前面咱們自己針對于Redis的RESP協議,對用戶指令進行了封裝,然后發往Redis執行。

    但對于這些常用的協議,Netty早已提供好了現成的處理器,想要使用時無需從頭開發,可以直接使用現成的處理器來實現。

    比如現在咱們可以基于Netty提供的處理器,實現一個簡單的HTTP服務器。

    代碼如下:

    // 基于Netty提供的處理器實現HTTP服務器

    publicclassHttpServer {

        publicstaticvoidmain(String[] args) throwsInterruptedException {

            EventLoopGroup boss = newNioEventLoopGroup();

            EventLoopGroup worker = newNioEventLoopGroup();

            ServerBootstrap server = newServerBootstrap();

            server

                .group(boss,worker)

                .channel(NioServerSocketChannel.class)

                .childHandler(newChannelInitializer<NioSocketChannel>() {

                    @Override

                    protectedvoidinitChannel(NioSocketChannel ch) {

                        ChannelPipeline pipeline = ch.pipeline();

     

                        // 添加一個Netty提供的HTTP處理器

                        pipeline.addLast(newHttpServerCodec());

                        pipeline.addLast(newChannelInboundHandlerAdapter() {

                            @Override

                            publicvoidchannelRead(ChannelHandlerContext ctx,

                                                    Object msg) throwsException {

                                // 在這里輸出一下消息的類型

                                System.out.println("消息類型:"+ msg.getClass());

                                super.channelRead(ctx, msg);

                            }

                        });

                        pipeline.addLast(newSimpleChannelInboundHandler<HttpRequest>() {

                            @Override

                            protectedvoidchannelRead0(ChannelHandlerContext ctx,

                                                        HttpRequest msg) throwsException {

                                System.out.println("客戶端的請求路徑:"+ msg.uri());

     

                                // 創建一個響應對象,版本號與客戶端保持一致,狀態碼為OK/200

                                DefaultFullHttpResponse response =

                                        newDefaultFullHttpResponse(

                                                msg.protocolVersion(),

                                                HttpResponseStatus.OK);

     

                                // 構造響應內容

                                byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();

     

                                // 設置響應頭:告訴客戶端本次響應的數據長度

                                response.headers().setInt(

                                    HttpHeaderNames.CONTENT_LENGTH,content.length);

                                // 設置響應主體

                                response.content().writeBytes(content);

     

                                // 向客戶端寫入響應數據

                                ctx.writeAndFlush(response);

                            }

                        });

                    }

                })

                .bind("127.0.0.1",8888)

                .sync();

        }

    }

    在該案例中,咱們就未曾手動對HTTP的數據包進行拆包處理了,而是在服務端的pipeline上添加了一個HttpServerCodec處理器,這個處理器是Netty官方提供的。

    其類繼承關系如下:

    publicfinalclassHttpServerCodec

        extendsCombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>

        implementsSourceCodec {

        // ......

    }

    觀察會發現,該類繼承自CombinedChannelDuplexHandler這個組合類,它組合了編碼器、解碼器。

    這也就意味著HttpServerCodec即可以對客戶端的數據做解碼,也可以對服務端響應的數據做編碼。

    同時除開添加了這個處理器外,在第二個處理器中打印了一下客戶端的消息類型,最后一個處理器中,對客戶端的請求做出了響應,其實也就是返回了一句話而已。

    此時在瀏覽器輸入http://127.0.0.1:8888/index.html,結果如下:

    消息類型:classio.netty.handler.codec.http.DefaultHttpRequest

    消息類型:classio.netty.handler.codec.http.LastHttpContent$1

    客戶端的請求路徑:/index.html

    此時來看結果,客戶端的請求會被解析成兩個部分:

    • 1)第一個是請求信息;
    • 2)第二個是主體信息。

    但按理來說瀏覽器發出的請求,屬于GET類型的請求,GET請求是沒有請求體信息的,但Netty依舊會解析成兩部分~,只不過GET請求的第二部分是空的。

    在第三個處理器中,咱們直接向客戶端返回了一個h1標簽,同時也要記得在響應頭里面,加上響應內容的長度信息,否則瀏覽器的加載圈,會一直不同的轉動,畢竟瀏覽器也不知道內容有多長,就會一直反復加載,嘗試等待更多的數據。

    7、自定義消息傳輸協議

    7.1概述

    Netty除開提供了HTTP協議的處理器外,還提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列協議的實現,具體定義位于io.netty.handler.codec這個包下,當然,咱們也可以自己實現自定義協議,按照自己的邏輯對數據進行編解碼處理。

    很多基于Netty開發的中間件/組件,其內部基本上都開發了專屬的通信協議,以此來作為不同節點間通信的基礎,所以解下來咱們基于Netty也來自己設計一款通信協議,這也會作為后續實現聊天程序時的基礎。

    所謂的協議設計,其實僅僅只需要按照一定約束,實現編碼器與解碼器即可,發送方在發出數據之前,會經過編碼器對數據進行處理,而接收方在收到數據之前,則會由解碼器對數據進行處理。

    7.2自定義協議的要素

    在自定義傳輸協議時,咱們必然需要考慮幾個因素,如下:

    • 1)魔數:用來第一時間判斷是否為自己需要的數據包;
    • 2)版本號:提高協議的拓展性,方便后續對協議進行升級;
    • 3)序列化算法:消息正文具體該使用哪種方式進行序列化傳輸,例如Json、ProtoBuf、JDK...;
    • 4)消息類型:第一時間判斷出當前消息的類型;
    • 5)消息序號:為了實現雙工通信,客戶端和服務端之間收/發消息不會相互阻塞;
    • 6)正文長度:提供給LTC解碼器使用,防止解碼時出現粘包、半包的現象;
    • 7)消息正文:本次消息要傳輸的具體數據。

    在設計協議時,一個完整的協議應該涵蓋上述所說的幾方面,這樣才能提供雙方通信時的基礎。

    基于上述幾個字段,能夠在第一時間內判斷出:

    • 1)消息是否可用;
    • 2)當前協議版本;
    • 3)消息的具體類型;
    • 4)消息的長度等各類信息。

    從而給后續處理器使用(自定義的協議規則本身就是一個編解碼處理器而已)。

    7.3自定義協議實戰

    前面簡單聊到過,所謂的自定義協議就是自己規定消息格式,以及自己實現編/解碼器對消息實現封裝/拆解,所以這里想要自定義一個消息協議,就只需要滿足前面兩個條件即可。

    因此實現如下:

    @ChannelHandler.Sharable

    publicclassChatMessageCodec extendsMessageToMessageCodec<ByteBuf, Message> {

     

        // 消息出站時會經過的編碼方法(將原生消息對象封裝成自定義協議的消息格式)

        @Override

        protectedvoidencode(ChannelHandlerContext ctx, Message msg,

                              List<Object> list) throwsException {

            ByteBuf outMsg = ctx.alloc().buffer();

            // 前五個字節作為魔數

            byte[] magicNumber = newbyte[]{'Z','h','u','Z','i'};

            outMsg.writeBytes(magicNumber);

            // 一個字節作為版本號

            outMsg.writeByte(1);

            // 一個字節表示序列化方式  0:JDK、1:Json、2:ProtoBuf.....

            outMsg.writeByte(0);

            // 一個字節用于表示消息類型

            outMsg.writeByte(msg.getMessageType());

            // 四個字節表示消息序號

            outMsg.writeInt(msg.getSequenceId());

     

            // 使用Java-Serializable的方式對消息對象進行序列化

            ByteArrayOutputStream bos = newByteArrayOutputStream();

            ObjectOutputStream oos = newObjectOutputStream(bos);

            oos.writeObject(msg);

            byte[] msgBytes = bos.toByteArray();

     

            // 使用四個字節描述消息正文的長度

            outMsg.writeInt(msgBytes.length);

            // 將序列化后的消息對象作為消息正文

            outMsg.writeBytes(msgBytes);

     

            // 將封裝好的數據傳遞給下一個處理器

            list.add(outMsg);

        }

     

        // 消息入站時會經過的解碼方法(將自定義格式的消息轉變為具體的消息對象)

        @Override

        protectedvoiddecode(ChannelHandlerContext ctx,

                              ByteBuf inMsg, List<Object> list) throwsException {

            // 讀取前五個字節得到魔數

            byte[] magicNumber = newbyte[5];

            inMsg.readBytes(magicNumber,0,5);

            // 再讀取一個字節得到版本號

            byteversion = inMsg.readByte();

            // 再讀取一個字節得到序列化方式

            byteserializableType = inMsg.readByte();

            // 再讀取一個字節得到消息類型

            bytemessageType = inMsg.readByte();

            // 再讀取四個字節得到消息序號

            intsequenceId = inMsg.readInt();

            // 再讀取四個字節得到消息正文長度

            intmessageLength = inMsg.readInt();

     

            // 再根據正文長度讀取序列化后的字節正文數據

            byte[] msgBytes = newbyte[messageLength];

            inMsg.readBytes(msgBytes,0,messageLength);

     

            // 對于讀取到的消息正文進行反序列化,最終得到具體的消息對象

            ByteArrayInputStream bis = newByteArrayInputStream(msgBytes);

            ObjectInputStream ois = newObjectInputStream(bis);

            Message message = (Message) ois.readObject();

     

            // 最終把反序列化得到的消息對象傳遞給后續的處理器

            list.add(message);

        }

    }

    上面自定義的處理器中,繼承了MessageToMessageCodec類,主要負責將數據在原生ByteBuf與Message之間進行相互轉換,而Message對象是自定義的消息對象,這里暫且無需過多關心。

    其中主要實現了兩個方法:

    • 1)encode():出站時會經過的編碼方法,會將原生消息對象按自定義的協議封裝成對應的字節數據;
    • 2)decode():入站時會經過的解碼方法,會將協議格式的字節數據,轉變為具體的消息對象。

    上述自定義的協議,也就是一定規則的字節數據,每條消息數據的組成如下:

    • 1)魔數:使用第1~5個字節來描述,這個魔數值可以按自己的想法自定義;
    • 2)版本號:使用第6個字節來描述,不同數字表示不同版本;
    • 3)序列化算法:使用第7個字節來描述,不同數字表示不同序列化方式;
    • 4)消息類型:使用第8個字節來描述,不同的消息類型使用不同數字表示;
    • 5)消息序號:使用第9~12個字節來描述,其實就是一個四字節的整數;
    • 6)正文長度:使用第13~16個字節來描述,也是一個四字節的整數;
    • 7)消息正文:長度不固定,根據每次具體發送的數據來決定。

    在其中,為了實現簡單,這里的序列化方式,則采用的是JDK默認的Serializable接口方式,但這種方式生成的對象字節較大,實際情況中最好還是選擇谷歌的ProtoBuf方式,這種算法屬于序列化算法中,性能最佳的一種落地實現。

    當然,這個自定義的協議是提供給后續的聊天業務使用的,但這種實戰型的內容分享,基本上代碼量較高,所以大家看起來會有些枯燥,而本文所使用的聊天室案例,是基于《B站-黑馬Netty視頻教程》二次改良的,因此如若感覺文字描述較為枯燥,可直接點擊前面給出的鏈接,觀看P101~P121視頻進行學習。

    最后來觀察一下,大家會發現,在咱們定義的這個協議編解碼處理器上,存在著一個@ChannelHandler.Sharable注解,這個注解的作用是干嗎的呢?其實很簡單,用來標識當前處理器是否可在多線程環境下使用,如果帶有該注解的處理器,則表示可以在多個通道間共用,因此只需要創建一個即可,反之同理,如果不帶有該注解的處理器,則每個通道需要單獨創建使用。

    PS:如果你想系統學習Protobuf,可以從以下文章入手:

    如何選擇即時通訊應用的數據傳輸格式

    強列建議將Protobuf作為你的即時通訊應用數據傳輸格式

    IM通訊協議專題學習(一):Protobuf從入門到精通,一篇就夠!

    IM通訊協議專題學習(二):快速理解Protobuf的背景、原理、使用、優缺點

    IM通訊協議專題學習(三):由淺入深,從根上理解Protobuf的編解碼原理

    IM通訊協議專題學習(四):從Base64到Protobuf,詳解Protobuf的數據編碼原理

    IM通訊協議專題學習(八):金蝶隨手記團隊的Protobuf應用實踐(原理篇)

    8、實戰要點1:IM程序的用戶模塊

    8.1概述

    聊天、聊天,自然是得先有人,然后才能進行聊天溝通。與QQ、微信類似,如果你想要使用某款聊天程序時,前提都得是先具備一個對應的賬戶才行。

    因此在咱們設計IM系統之處,那也需要對應的用戶功能實現。但這里為了簡單,同樣不再結合數據庫實現完整的用戶模塊了,而是基于內存實現用戶的管理。

    如下:

    publicinterfaceUserService {

        booleanlogin(String username, String password);

    }

    這是用戶模塊的頂層接口,僅僅只提供了一個登錄接口,關于注冊、鑒權、等級.....等一系列功能,大家感興趣的可在后續進行拓展實現,接著來看看該接口的實現類。

    如下:

    publicclassUserServiceMemoryImpl implementsUserService {

        privateMap<String, String> allUserMap = newConcurrentHashMap<>();

     

        {

            // 在代碼塊中對用戶列表進行初始化,向其中添加了兩個用戶信息

            allUserMap.put("ZhuZi", "123");

            allUserMap.put("XiongMao", "123");

        }

     

        @Override

        publicbooleanlogin(String username, String password) {

            String pass = allUserMap.get(username);

            if(pass == null) {

                returnfalse;

            }

            returnpass.equals(password);

        }

    }

    這個實現類并未結合數據庫來實現,而是僅僅在程序啟動時,通過代碼塊的方式,加載了ZhuZi、XiongMao兩個用戶信息并放入內存的Map容器中,這里有興趣的小伙伴,可自行將Map容器換成數據庫的表即可。

    其中實現的login()登錄接口尤為簡單,僅僅只是判斷了一下有沒有對應用戶,如果有的話則看看密碼是否正確,正確返回true,密碼錯誤則返回false。是的,我所寫的登錄功能就是這么簡單,走個簡單的過場,哈哈哈~

    8.2服務端、客戶端的基礎架構

    基本的用戶模塊有了,但這里還未曾套入具體實現,因此先簡單的搭建出服務端、客戶端的架構,然后再基于構建好的架構實現基礎的用戶登錄功能。

    服務端的基礎搭建如下:

    publicclassChatServer {

        publicstaticvoidmain(String[] args) {

            NioEventLoopGroup boss = newNioEventLoopGroup();

            NioEventLoopGroup worker = newNioEventLoopGroup();

     

            ChatMessageCodec MESSAGE_CODEC = newChatMessageCodec();

     

            try{

                ServerBootstrap serverBootstrap = newServerBootstrap();

                serverBootstrap.channel(NioServerSocketChannel.class);

                serverBootstrap.group(boss, worker);

                serverBootstrap.childHandler(newChannelInitializer<SocketChannel>() {

                    @Override

                    protectedvoidinitChannel(SocketChannel ch) throwsException {

                        ch.pipeline().addLast(MESSAGE_CODEC);

                    }

                });

     

                Channel channel = serverBootstrap.bind(8888).sync().channel();

                channel.closeFuture().sync();

            } catch(InterruptedException e) {

                System.out.println("服務端出現錯誤:"+ e);

            } finally{

                boss.shutdownGracefully();

                worker.shutdownGracefully();

            }

        }

    }

    服務端的代碼目前很簡單,僅僅只是裝載了一個自己的協議編/解碼處理器,然后就是一些老步驟,不再過多的重復贅述,接著再來搭建一個簡單的客戶端。

    代碼實現如下:

    publicclassChatClient {

        publicstaticvoidmain(String[] args) {

            NioEventLoopGroup group = newNioEventLoopGroup();

     

            ChatMessageCodec MESSAGE_CODEC = newChatMessageCodec();

     

            try{

                Bootstrap bootstrap = newBootstrap();

                bootstrap.channel(NioSocketChannel.class);

                bootstrap.group(group);

                bootstrap.handler(newChannelInitializer<SocketChannel>() {

                    @Override

                    protectedvoidinitChannel(SocketChannel ch) throwsException {

                        ch.pipeline().addLast(MESSAGE_CODEC);

                    }

                });

                Channel channel = bootstrap.connect("localhost", 8888).sync().channel();

                channel.closeFuture().sync();

            } catch(Exception e) {

                System.out.println("客戶端出現錯誤:"+ e);

            } finally{

                group.shutdownGracefully();

            }

        }

    }

    目前僅僅只是與服務端建立了連接,然后裝載了一個自定義的編解碼器,到這里就搭建了最基本的服務端、客戶端的基礎架構,接著來基于它實現簡單的登錄功能。

    8.3用戶登錄功能的實現

    對于登錄功能,由于需要在服務端與客戶端之間傳輸數據,因此咱們可以設計一個消息對象,但由于后續單聊、群聊都需要發送不同的消息格式,因此先設計出一個父類。

    如下:

    publicabstractclassMessage implementsSerializable {

     

        privateintsequenceId;

        privateintmessageType;

     

     

        @Override

        publicString toString() {

            return"Message{"+

                    "sequenceId="+ sequenceId +

                    ", messageType="+ messageType +

                    '}';

        }

     

        publicintgetSequenceId() {

            returnsequenceId;

        }

     

        publicvoidsetSequenceId(intsequenceId) {

            this.sequenceId = sequenceId;

        }

     

        publicvoidsetMessageType(intmessageType) {

            this.messageType = messageType;

        }

     

        publicabstractintgetMessageType();

     

        publicstaticfinalintLoginRequestMessage = 0;

        publicstaticfinalintLoginResponseMessage = 1;

        publicstaticfinalintChatRequestMessage = 2;

        publicstaticfinalintChatResponseMessage = 3;

        publicstaticfinalintGroupCreateRequestMessage = 4;

        publicstaticfinalintGroupCreateResponseMessage = 5;

        publicstaticfinalintGroupJoinRequestMessage = 6;

        publicstaticfinalintGroupJoinResponseMessage = 7;

        publicstaticfinalintGroupQuitRequestMessage = 8;

        publicstaticfinalintGroupQuitResponseMessage = 9;

        publicstaticfinalintGroupChatRequestMessage = 10;

        publicstaticfinalintGroupChatResponseMessage = 11;

        publicstaticfinalintGroupMembersRequestMessage = 12;

        publicstaticfinalintGroupMembersResponseMessage = 13;

        publicstaticfinalintPingMessage = 14;

        publicstaticfinalintPongMessage = 15;

    }

    在這個消息父類中,定義了多種消息類型的狀態碼,不同的消息類型對應不同數字,同時其中還設計了一個抽象方法,即getMessageType(),該方法交給具體的子類實現,每個子類返回各自的消息類型,為了方便后續拓展,這里又創建了一個抽象類作為中間類。

    如下:

    publicabstractclassAbstractResponseMessage extendsMessage {

        privatebooleansuccess;

        privateString reason;

     

        publicAbstractResponseMessage() {

        }

     

        publicAbstractResponseMessage(booleansuccess, String reason) {

            this.success = success;

            this.reason = reason;

        }

     

        @Override

        publicString toString() {

            return"AbstractResponseMessage{"+

                    "success="+ success +

                    ", reason='"+ reason + '\''+

                    '}';

        }

     

        publicbooleanisSuccess() {

            returnsuccess;

        }

     

        publicvoidsetSuccess(booleansuccess) {

            this.success = success;

        }

     

        publicString getReason() {

            returnreason;

        }

     

        publicvoidsetReason(String reason) {

            this.reason = reason;

        }

    }

    這個類主要是提供給響應時使用的,其中包含了響應狀態以及響應信息,接著再設計兩個登錄時會用到的消息對象。

    如下:

    publicclassLoginRequestMessage extendsMessage {

        privateString username;

        privateString password;

     

        publicLoginRequestMessage() {

        }

     

        @Override

        publicString toString() {

            return"LoginRequestMessage{"+

                    "username='"+ username + '\''+

                    ", password='"+ password + '\''+

                    '}';

        }

     

        publicString getUsername() {

            returnusername;

        }

     

        publicvoidsetUsername(String username) {

            this.username = username;

        }

     

        publicString getPassword() {

            returnpassword;

        }

     

        publicvoidsetPassword(String password) {

            this.password = password;

        }

     

        publicLoginRequestMessage(String username, String password) {

            this.username = username;

            this.password = password;

        }

     

        @Override

        publicintgetMessageType() {

            returnLoginRequestMessage;

        }

    }

    上述這個消息類,主要是提供給客戶端登錄時使用,本質上也就是一個涵蓋用戶名、用戶密碼的對象而已,同時還有一個用來給服務端響應時的響應類。

    如下:

    publicclassLoginResponseMessage extendsAbstractResponseMessage {

        publicLoginResponseMessage(booleansuccess, String reason) {

            super(success, reason);

        }

     

        @Override

        publicintgetMessageType() {

            returnLoginResponseMessage;

        }

    }

    登錄響應類的實現十分簡單,由登錄狀態和登錄消息組成,OK,接著來看看登錄的具體實現。

    首先在客戶端中,再通過pipeline添加一個處理器,如下:

    CountDownLatch WAIT_FOR_LOGIN = newCountDownLatch(1);

    AtomicBoolean LOGIN = newAtomicBoolean(false);

    AtomicBoolean EXIT = newAtomicBoolean(false);

    Scanner scanner = newScanner(System.in);

     

    ch.pipeline().addLast("client handler", newChannelInboundHandlerAdapter() {

        @Override

        publicvoidchannelActive(ChannelHandlerContext ctx) throwsException {

            // 負責接收用戶在控制臺的輸入,負責向服務器發送各種消息

            newThread(() -> {

                System.out.println("請輸入用戶名:");

                String username = scanner.nextLine();

                if(EXIT.get()){

                    return;

                }

                System.out.println("請輸入密碼:");

                String password = scanner.nextLine();

                if(EXIT.get()){

                    return;

                }

                // 構造消息對象

                LoginRequestMessage message = newLoginRequestMessage(username, password);

                System.out.println(message);

                // 發送消息

                ctx.writeAndFlush(message);

                System.out.println("等待后續操作...");

                try{

                    WAIT_FOR_LOGIN.await();

                } catch(InterruptedException e) {

                    e.printStackTrace();

                }

                // 如果登錄失敗

                if(!LOGIN.get()) {

                    ctx.channel().close();

                    return;

                }

        }).start();

    }

    在與服務端建立連接成功之后,就提示用戶需要登錄,接著接收用戶輸入的用戶名、密碼,然后構建出一個LoginRequestMessage消息對象,接著將其發送給服務端,由于前面裝載了自定義的協議編解碼器,所以消息在出站時,這個Message對象會被序列化成字節碼,接著再服務端入站時,又會被反序列化成消息對象,接著來看看服務端的實現。

    如下:

    @ChannelHandler.Sharable

    publicclassLoginRequestMessageHandler

                extendsSimpleChannelInboundHandler<LoginRequestMessage> {

        @Override

        protectedvoidchannelRead0(ChannelHandlerContext ctx,

                    LoginRequestMessage msg) throwsException {

            String username = msg.getUsername();

            String password = msg.getPassword();

            booleanlogin = UserServiceFactory.getUserService().login(username, password);

            LoginResponseMessage message;

            if(login) {

                SessionFactory.getSession().bind(ctx.channel(), username);

                message = newLoginResponseMessage(true, "登錄成功");

            } else{

                message = newLoginResponseMessage(false, "用戶名或密碼不正確");

            }

            ctx.writeAndFlush(message);

        }

    }

    在服務端中,新增了一個處理器類,繼承自SimpleChannelInboundHandler這個處理器,其中指定的泛型為LoginRequestMessage,這表示當前處理器只關注這個類型的消息,當出現登錄類型的消息時,會進入該處理器并觸發內部的channelRead0()方法。

    在該方法中,獲取了登錄消息中的用戶名、密碼,接著對其做了基本的登錄效驗,如果用戶名存在并且密碼正確,就會返回登錄成功,否則會返回登錄失敗,最終登錄后的狀態會被封裝成一個LoginResponseMessage對象,然后寫回客戶端的通道中。

    當然,為了該處理器能夠成功生效,這里需要將其裝載到服務端的pipeline上。

    如下:

    LoginRequestMessageHandler LOGIN_HANDLER = newLoginRequestMessageHandler();

    ch.pipeline().addLast(LOGIN_HANDLER);

    裝載好登錄處理器后,接著分別啟動服務端、客戶端,測試結果如下:

    從圖中的效果來看,這里實現了最基本的登錄功能,估計有些小伙伴看到這里就有些暈了,但其實非常簡單,僅僅只是通過Netty在做數據交互而已,客戶端則提供輸入用戶名、密碼的功能,然后將用戶輸入的名稱、密碼發送給服務端,服務端提供登錄判斷的功能,最終根據判斷結果再向客戶端返回數據罷了。

    9、實戰要點2:實現點對點單聊

    9.1概述

    有了基本的用戶登錄功能后,接著來看看如何實現點對點的單聊功能呢?

    首先我定義了一個會話接口,如下:

    publicinterfaceSession {

        voidbind(Channel channel, String username);

        voidunbind(Channel channel);

        Channel getChannel(String username);

    }

    這個接口中依舊只有三個方法,釋義如下:

    • 1)bind():傳入一個用戶名和Socket通道,讓兩者之間的產生綁定關系;
    • 2)unbind():取消一個用戶與某個Socket通道的綁定關系;
    • 3)getChannel():根據一個用戶名,獲取與其存在綁定關系的通道。

    該接口的實現類如下:

    publicclassSessionMemoryImpl implementsSession {

     

        privatefinalMap<String, Channel> usernameChannelMap = newConcurrentHashMap<>();

        privatefinalMap<Channel, String> channelUsernameMap = newConcurrentHashMap<>();

     

        @Override

        publicvoidbind(Channel channel, String username) {

            usernameChannelMap.put(username, channel);

            channelUsernameMap.put(channel, username);

            channelAttributesMap.put(channel, newConcurrentHashMap<>());

        }

     

        @Override

        publicvoidunbind(Channel channel) {

            String username = channelUsernameMap.remove(channel);

            usernameChannelMap.remove(username);

            channelAttributesMap.remove(channel);

        }

     

        @Override

        publicChannel getChannel(String username) {

            returnusernameChannelMap.get(username);

        }

     

        @Override

        publicString toString() {

            returnusernameChannelMap.toString();

        }

    }

    該實現類最關鍵的是其中的兩個Map容器,usernameChannelMap用來存儲所有用戶名與Socket通道的綁定關系,而channelUsernameMap則是反過來的順序,這主要是為了方便,即可以通過用戶名獲得對應通道,也可以通過通道判斷出用戶名,實際上一個Map也能搞定,但還是那句話,主要為了簡單嘛~

    有了上述這個最簡單的會話管理功能后,就要著手實現具體的功能了,其實在前面實現登錄功能的時候,就用過這其中的bind()方法,也就是當登錄成功之后,就會將當前發送登錄消息的通道,與正在登錄的用戶名產生綁定關系,這樣就方便后續實現單聊、群聊的功能。

    9.2定義單聊的消息對象

    與登錄時相同,由于需要在服務端和客戶端之間實現數據的轉發,因此這里也需要兩個消息對象,用來作為數據交互的消息格式。

    如下:

    publicclassChatRequestMessage extendsMessage {

        privateString content;

        privateString to;

        privateString from;

     

        publicChatRequestMessage() {

        }

     

        publicChatRequestMessage(String from, String to, String content) {

            this.from = from;

            this.to = to;

            this.content = content;

        }

        // 省略Get/Setting、toString()方法.....

    }

    上述這個類,是提供給客戶端用來發送消息數據的,其中主要包含了三個值,聊天的消息內容、發送人與接收人。因為這里是需要實現一個IM聊天程序,所以并不是客戶端與服務端進行數據交互,而是客戶端與客戶端之間進行數據交互,服務端僅僅只提供消息轉發的功能,接著再構建一個消息類。

    如下:

    publicclassChatResponseMessage extendsAbstractResponseMessage {

     

        privateString from;

        privateString content;

     

        @Override

        publicString toString() {

            return"ChatResponseMessage{"+

                    "from='"+ from + '\''+

                    ", content='"+ content + '\''+

                    '}';

        }

     

        publicChatResponseMessage(booleansuccess, String reason) {

            super(success, reason);

        }

     

        publicChatResponseMessage(String from, String content) {

            this.from = from;

            this.content = content;

        }

     

        @Override

        publicintgetMessageType() {

            returnChatResponseMessage;

        }

        // 省略Get/Setting、toString()方法.....

    }

    這個類是提供給服務端用來轉發的,當服務端收到一個聊天消息后,因為聊天消息中包含了接收人,所以可以先根據接收人的用戶名,找到對應的客戶端通道,然后再封裝成一個響應消息,轉發給對應的客戶端即可,下面來做具體實現。

    9.3實現點對點單聊功能

    由于聊天功能是提供給客戶端使用的,所以當一個客戶端登錄成功之后,應該暴露給用戶一個操作菜單,所以直接在原本客戶端的channelActive()方法中,登錄成功之后繼續加代碼即可。

    代碼如下:

    while(true) {

        System.out.println("==================================");

        System.out.println("\t1、發送單聊消息");

        System.out.println("\t2、發送群聊消息");

        System.out.println("\t3、創建一個群聊");

        System.out.println("\t4、獲取群聊成員");

        System.out.println("\t5、加入一個群聊");

        System.out.println("\t6、退出一個群聊");

        System.out.println("\t7、退出聊天系統");

        System.out.println("==================================");

        String command = scanner.nextLine();

    }

    首先會開啟一個死循環,然后不斷接收用戶的操作,接著使用switch語法來對具體的菜單功能進行實現,先實現單聊功能。

    如下:

    switch(command){

        case"1":

            System.out.print("請選擇你要發送消息給誰:");

            String toUserName = scanner.nextLine();

            System.out.print("請輸入你要發送的消息內容:");

            String content = scanner.nextLine();

            ctx.writeAndFlush(newChatRequestMessage(username, toUserName, content));

            break;

    }

    如果用戶選擇了單聊,接著會提示用戶選擇要發送消息給誰,這里也就是讓用戶輸入對方的用戶名,實際上如果有界面的話,這一步是并不需要用戶自己輸入的,而是提供窗口讓用戶點擊,比如QQ、微信一樣,想要給某個人發送消息時,只需要點擊“他”的頭像私聊即可。

    等用戶選擇了聊天目標,并且輸入了消息內容后,接著會構建一個ChatRequestMessage消息對象,然后會發送給服務端,但這里先不看服務端的實現,客戶端這邊還需要重寫一個方法。

    如下:

    @Override

    publicvoidchannelRead(ChannelHandlerContext ctx, Object msg) throwsException {

        System.out.println("收到消息:"+ msg);

        if((msg instanceofLoginResponseMessage)) {

            LoginResponseMessage response = (LoginResponseMessage) msg;

            if(response.isSuccess()) {

                // 如果登錄成功

                LOGIN.set(true);

            }

            // 喚醒 system in 線程

            WAIT_FOR_LOGIN.countDown();

        }

    }

    前面的邏輯是在channelActive()方法中完成的,也就是連接建立成功后,就會讓用戶登錄,接著登錄成功之后會給用戶一個菜單欄,提供給用戶進行操作,但前面的邏輯中一直沒有對服務端響應的消息進行處理,因此channelRead()方法中會對服務端響應的數據進行處理。

    channelRead()方法會在有數據可讀時被觸發,所以當服務端響應數據時,首先會判斷一下:目前服務端響應的是不是登錄消息,如果是的話,則需要根據登錄的結果來喚醒前面channelActive()方法中的線程。如果目前服務端響應的不是登錄消息,這也就意味著客戶端前面已經登錄成功了,所以接著會直接打印一下收到的數據。

    OK,有了上述客戶端的代碼實現后,接著再來服務端多創建一個處理器。

    如下:

    @ChannelHandler.Sharable

    publicclassChatRequestMessageHandler

                extendsSimpleChannelInboundHandler<ChatRequestMessage> {

        @Override

        protectedvoidchannelRead0(ChannelHandlerContext ctx,

                        ChatRequestMessage msg) throwsException {

            String to = msg.getTo();

            Channel channel = SessionFactory.getSession().getChannel(to);

            // 在線

            if(channel != null) {

                channel.writeAndFlush(newChatResponseMessage(

                            msg.getFrom(), msg.getContent()));

            }

            // 不在線

            else{

                ctx.writeAndFlush(newChatResponseMessage(

                            false, "對方用戶不存在或者不在線"));

            }

        }

    }

    這里依舊通過繼承SimpleChannelInboundHandler類的形式,來特別關注ChatRequestMessage單聊類型的消息,如果目前服務端收到的是單聊消息,則會進入觸發該處理器的channelRead0()方法。

    該處理器內部的邏輯也并不復雜,首先根據單聊消息的接收人,去找一下與之對應的通道:

    • 1)如果根據用戶名查到了通道,表示接收人目前是登錄在線狀態;
    • 2)反之,如果無法根據用戶名找到通道,表示對應的用戶不存在或者沒有登錄。

    接著會根據上面的查詢結果,進行對應的結果返回:

    • 1)如果在線:把要發送的單聊消息,直接寫入至找到的通道中;
    • 2)如果不在線:向發送單聊消息的客戶端,返回用戶不存在或用戶不在線。

    有了這個處理器之后,接著還需要把該處理器裝載到服務端上,如下:

    ChatRequestMessageHandler CHAT_HANDLER = newChatRequestMessageHandler();

    ch.pipeline().addLast(CHAT_HANDLER);

    裝載好單聊處理器后,接著分別啟動一個服務端、兩個客戶端,測試結果如下:

    從測試結果中可以明顯看出效果,其中的單聊功能的確已經實現,可以實現A→B用戶之間的單聊功能,兩者之間借助服務器轉發,可以實現兩人私聊的功能。

    10、實戰要點3:打造多人聊天室

    10.1概述

    前面實現了兩個用戶之間的私聊功能,接著再來實現一個多人聊天室的功能,畢竟像QQ、微信、釘釘....等任何通訊軟件,都支持多人建立群聊的功能。

    但多人聊天室的功能,實現之前還需要先完成建群的功能,畢竟如果群都沒建立,自然無法向某個群內發送數據。

    實現拉群也好,群聊也罷,其實現步驟依舊和前面相同,如下:

    • 1)先定義對應的消息對象;
    • 2)實現客戶端發送對應消息數據的功能;
    • 3)再寫一個服務端的群聊處理器,然后裝載到服務端上。

    10.2定義拉群的消息體

    首先來定義兩個拉群時用的消息體,如下:

    publicclassGroupCreateRequestMessage extendsMessage {

        privateString groupName;

        privateSet<String> members;

     

        publicGroupCreateRequestMessage(String groupName, Set<String> members) {

            this.groupName = groupName;

            this.members = members;

        }

     

        @Override

        publicintgetMessageType() {

            returnGroupCreateRequestMessage;

        }

     

        // 省略其他Get/Settings、toString()方法.....

    }

    上述這個消息體是提供給客戶端使用的,其中主要存在兩個成員,也就是群名稱與群成員列表,存放所有群成員的容器選用了Set集合,因為Set集合具備不可重復性,因此可以有效的避免同一用戶多次進群,接著再來看看服務端響應時用的消息體。

    如下:

    publicclassGroupCreateResponseMessage extendsAbstractResponseMessage {

        publicGroupCreateResponseMessage(booleansuccess, String reason) {

            super(success, reason);

        }

     

        @Override

        publicintgetMessageType() {

            returnGroupCreateResponseMessage;

        }

    }

    這個消息體的實現尤為簡單,僅僅只是給客戶端返回了拉群狀態以及拉群的附加信息。

    10.3定義群聊會話管理

    前面單聊有單聊的會話管理機制,而實現多人群聊時,依舊需要有群聊的會話管理機制,首先封裝了一個群聊實體類。

    如下:

    publicclassGroup {

        // 聊天室名稱

        privateString name;

        // 聊天室成員

        privateSet<String> members;

     

        publicstaticfinalGroup EMPTY_GROUP = newGroup("empty", Collections.emptySet());

     

        publicGroup(String name, Set<String> members) {

            this.name = name;

            this.members = members;

        }

     

        // 省略其他Get/Settings、toString()方法.....

    }

    接著定義了一個群聊會話的頂級接口,如下:

    publicinterfaceGroupSession {

        // 創建一個群聊

        Group createGroup(String name, Set<String> members);

        // 加入某個群聊

        Group joinMember(String name, String member);

        // 移除群聊中的某個成員

        Group removeMember(String name, String member);

        // 解散一個群聊

        Group removeGroup(String name);

        // 獲取一個群聊的成員列表

        Set<String> getMembers(String name);

        // 獲取一個群聊所有在線用戶的Channel通道

        List<Channel> getMembersChannel(String name);

    }

    上述接口中,提供了幾個接口方法,其實也主要是群聊系統中的一些日常操作,如創群、加群、踢人、解散群、查看群成員....等功能,接著來看看該接口的實現者。

    如下:

    publicclassGroupSessionMemoryImpl implementsGroupSession {

        privatefinalMap<String, Group> groupMap = newConcurrentHashMap<>();

     

        @Override

        publicGroup createGroup(String name, Set<String> members) {

            Group group = newGroup(name, members);

            returngroupMap.putIfAbsent(name, group);

        }

     

        @Override

        publicGroup joinMember(String name, String member) {

            returngroupMap.computeIfPresent(name, (key, value) -> {

                value.getMembers().add(member);

                returnvalue;

            });

        }

     

        @Override

        publicGroup removeMember(String name, String member) {

            returngroupMap.computeIfPresent(name, (key, value) -> {

                value.getMembers().remove(member);

                returnvalue;

            });

        }

     

        @Override

        publicGroup removeGroup(String name) {

            returngroupMap.remove(name);

        }

     

        @Override

        publicSet<String> getMembers(String name) {

            returngroupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();

        }

     

        @Override

        publicList<Channel> getMembersChannel(String name) {

            returngetMembers(name).stream()

                    .map(member -> SessionFactory.getSession().getChannel(member))

                    .filter(Objects::nonNull)

                    .collect(Collectors.toList());

        }

    }

    這個實現類沒啥好說的,重點記住里面有個Map容器即可,這個容器主要負責存儲所有群名稱與Group群聊對象的關系,后續可以通過群聊名稱,在這個容器中找到一個對應群聊對象。同時為了方便后續調用這些接口,還提供了一個工具類。

    如下:

    publicabstractclassGroupSessionFactory {

        privatestaticGroupSession session = newGroupSessionMemoryImpl();

     

        publicstaticGroupSession getGroupSession() {

            returnsession;

        }

    }

    很簡單,僅僅只實例化了一個群聊會話管理的實現類,因為這里沒有結合Spring來實現,所以并不能依靠IOC技術來自動管理Bean,因此咱們需要手動創建出一個實例,以供于后續使用。

    10.4實現拉群功能

    前面客戶端的功能菜單中,3對應著拉群功能,所以咱們需要對3做具體的功能實現。

    邏輯如下:

    case"3":

        System.out.print("請輸入你要創建的群聊昵稱:");

        String newGroupName = scanner.nextLine();

        System.out.print("請選擇你要邀請的群成員(不同成員用、分割):");

        String members = scanner.nextLine();

        Set<String> memberSet = newHashSet<>(Arrays.asList(members.split("、")));

        memberSet.add(username); // 加入自己

        ctx.writeAndFlush(newGroupCreateRequestMessage(newGroupName, memberSet));

        break;

    在該分支實現中,首先會要求用戶輸入一個群聊昵稱,接著需要輸入需要拉入群聊的用戶名稱,多個用戶之間使用、分割,接著會把用戶輸入的群成員以及自己,全部放入到一個Set集合中,最終組裝成一個拉群消息體,發送給服務端處理。

    服務端的處理器如下:

    @ChannelHandler.Sharable

    publicclassGroupCreateRequestMessageHandler

            extendsSimpleChannelInboundHandler<GroupCreateRequestMessage> {

        @Override

        protectedvoidchannelRead0(ChannelHandlerContext ctx,

                    GroupCreateRequestMessage msg) throwsException {

            String groupName = msg.getGroupName();

            Set<String> members = msg.getMembers();

            // 群管理器

            GroupSession groupSession = GroupSessionFactory.getGroupSession();

            Group group = groupSession.createGroup(groupName, members);

            if(group == null) {

                // 發生成功消息

                ctx.writeAndFlush(newGroupCreateResponseMessage(true,

                                    groupName + "創建成功"));

                // 發送拉群消息

                List<Channel> channels = groupSession.getMembersChannel(groupName);

                for(Channel channel : channels) {

                    channel.writeAndFlush(newGroupCreateResponseMessage(

                                        true, "您已被拉入"+ groupName));

                }

            } else{

                ctx.writeAndFlush(newGroupCreateResponseMessage(

                                    false, groupName + "已經存在"));

            }

        }

    }

    這里依舊繼承了SimpleChannelInboundHandler類,只關心拉群的消息,當客戶端出現拉群消息時,首先會獲取用戶輸入的群昵稱和群成員,接著通過前面提供的創群接口,嘗試創建一個群聊,如果群聊已經存在,則會創建失敗,反之則會創建成功,在創建群聊成功的情況下,會給所有的群成員發送一條“你已被拉入[XXX]”的消息。

    最后,同樣需要將該處理器裝載到服務端上,如下:

    GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER =

                        newGroupCreateRequestMessageHandler();

    ch.pipeline().addLast(GROUP_CREATE_HANDLER);

    最后分別啟動一個服務端、兩個客戶端進行效果測試,如下:

    從上圖的測試結果來看,的確實現了咱們的拉群效果,一個用戶拉群之后,被邀請的成員都會收到來自于服務端的拉群提醒,這也就為后續群聊功能奠定了基礎。

    10.5定義群聊的消息體

    這里就不重復贅述了,還是之前的套路,定義一個客戶端用的消息體,如下:

    publicclassGroupChatRequestMessage extendsMessage {

        privateString content;

        privateString groupName;

        privateString from;

     

        publicGroupChatRequestMessage(String from, String groupName, String content) {

            this.content = content;

            this.groupName = groupName;

            this.from = from;

        }

     

        @Override

        publicintgetMessageType() {

            returnGroupChatRequestMessage;

        }

        // 省略其他Get/Settings、toString()方法.....

    }

    這個是客戶端用來發送群聊消息的消息體,其中存在三個成員,發送人、群聊昵稱、消息內容,通過這三個成員,可以描述清楚任何一條群聊記錄,接著來看看服務端響應時用的消息體。

    如下:

    publicclassGroupChatResponseMessage extendsAbstractResponseMessage {

        privateString from;

        privateString content;

     

        publicGroupChatResponseMessage(booleansuccess, String reason) {

            super(success, reason);

        }

     

        publicGroupChatResponseMessage(String from, String content) {

            this.from = from;

            this.content = content;

        }

        @Override

        publicintgetMessageType() {

            returnGroupChatResponseMessage;

        }

        // 省略其他Get/Settings、toString()方法.....

    }

    在這個消息體中,就省去了群聊昵稱這個成員,因為這個消息體的用處,主要是給服務端轉發給客戶端時使用的,因此不需要群聊昵稱,當然,要也可以,我這里就直接省去了。

    10.6實現群聊功能

    依舊先來做客戶端的實現,實現了客戶端之后再去完成服務端的實現,客戶端實現如下:

    case"2":

        System.out.print("請選擇你要發送消息的群聊:");

        String groupName = scanner.nextLine();

        System.out.print("請輸入你要發送的消息內容:");

        String groupContent = scanner.nextLine();

        ctx.writeAndFlush(newGroupChatRequestMessage(username, groupName, groupContent));

        break;

    因為發送群聊消息對應著之前菜單中的2,所以這里對該分支進行實現,當用戶選擇發送群聊消息時,首先會讓用戶自己先選擇一個群聊,接著輸入要發送的消息內容,接著組裝成一個群聊消息對象,發送給服務端處理。

    服務端的實現如下:

    @ChannelHandler.Sharable

    publicclassGroupChatRequestMessageHandler

            extendsSimpleChannelInboundHandler<GroupChatRequestMessage> {

        @Override

        protectedvoidchannelRead0(ChannelHandlerContext ctx,

                    GroupChatRequestMessage msg) throwsException {

            List<Channel> channels = GroupSessionFactory.getGroupSession()

                    .getMembersChannel(msg.getGroupName());

     

            for(Channel channel : channels) {

                channel.writeAndFlush(newGroupChatResponseMessage(

                                msg.getFrom(), msg.getContent()));

            }

        }

    }

    這里依舊定義了一個處理器,關于原因就不再重復啰嗦了,服務端對于群聊消息的實現額外簡單,也就是先根據用戶選擇的群昵稱,找到該群所有的群成員,然后依次遍歷成員列表,獲取對應的Socket通道,轉發消息即可。

    接著將該處理器裝載到服務端pipeline上,然后分別啟動一個服務端、兩個客戶端,進行效果測試,如下:

    效果如上圖的注釋,基于上述的代碼測試,效果確實達到了咱們需要的群聊效果~

    10.7聊天室的其他功能實現

    到這里為止,實現了最基本的建群、群聊的功能,但對于踢人、加群、解散群....等一系列群聊功能還未曾實現,但我這里就不繼續重復了。

    畢竟還是那個套路:

    • 1)定義對應功能的消息體;
    • 2)客戶端向服務端發送對應格式的消息;
    • 3)服務端編寫處理器,對特定的消息進行處理。

    所以大家感興趣的情況下,可以根據上述步驟繼續進行實現,實現的過程沒有任何難度,重點就是時間問題罷了。

    11、本文小結

    看到這里,其實Netty實戰篇的內容也就大致結束了,個人對于實戰篇的內容并不怎么滿意,因為與最初設想的實現存在很大偏差,這是由于近期工作、生活狀態不對,所以內容輸出也沒那么夯實,對于這篇中的完整代碼實現,也包括前面兩篇中的一些代碼實現(詳見“2、配套源碼”),大家感興趣可以自行Down下去玩玩。

    在我所撰寫的案例中,自定義協議可以繼續優化,選擇性能更強的序列化方式,而聊天室也可以進一步拓展,比如將用戶信息、群聊信息、聯系人信息都結合數據庫實現,進一步實現離線消息功能,但由于該案例的設計之初就有問題,所以是存在性能問題的,想要打造一款真正高性能的IM程序,那諸位可參考本系列前面的文章即可。

    12、系列文章

    跟著源碼學IM(一):手把手教你用Netty實現心跳機制、斷線重連機制

    跟著源碼學IM(二):自已開發IM很難?手把手教你擼一個Andriod版IM

    跟著源碼學IM(三):基于Netty,從零開發一個IM服務端

    跟著源碼學IM(四):拿起鍵盤就是干,教你徒手開發一套分布式IM系統

    跟著源碼學IM(五):正確理解IM長連接、心跳及重連機制,并動手實現

    跟著源碼學IM(六):手把手教你用Go快速搭建高性能、可擴展的IM系統

    跟著源碼學IM(七):手把手教你用WebSocket打造Web端IM聊天

    跟著源碼學IM(八):萬字長文,手把手教你用Netty打造IM聊天

    跟著源碼學IM(九):基于Netty實現一套分布式IM系統

    跟著源碼學IM(十):基于Netty,搭建高性能IM集群(含技術思路+源碼)

    跟著源碼學IM(十一):一套基于Netty的分布式高可用IM詳細設計與實現(有源碼)

    跟著源碼學IM(十二):基于Netty打造一款高性能的IM即時通訊程序》(* 本文)

    SpringBoot集成開源IM框架MobileIMSDK,實現即時通訊IM聊天功能

    13、參考資料

    [1] 淺談IM系統的架構設計

    [2] 簡述移動端IM開發的那些坑:架構設計、通信協議和客戶端

    [3] 一套海量在線用戶的移動端IM架構設計實踐分享(含詳細圖文)

    [4] 一套原創分布式即時通訊(IM)系統理論架構方案

    [5] 一套億級用戶的IM架構技術干貨(上篇):整體架構、服務拆分等

    [6] 一套億級用戶的IM架構技術干貨(下篇):可靠性、有序性、弱網優化等

    [7] 史上最通俗Netty框架入門長文:基本介紹、環境搭建、動手實戰

    [8] 強列建議將Protobuf作為你的即時通訊應用數據傳輸格式

    [9] IM通訊協議專題學習(一):Protobuf從入門到精通,一篇就夠!

    [10] 融云技術分享:全面揭秘億級IM消息的可靠投遞機制

    [11] IM群聊消息如此復雜,如何保證不丟不重?

    [12] 零基礎IM開發入門(四):什么是IM系統的消息時序一致性?

    [13] 如何保證IM實時消息的“時序性”與“一致性”?

    [14] 微信的海量IM聊天消息序列號生成實踐(算法原理篇)

    [15] 網易云信技術分享:IM中的萬人群聊技術方案實踐總結

    [16] 融云IM技術分享:萬人群聊消息投遞方案的思考和實踐

    [17] 為何基于TCP協議的移動端IM仍然需要心跳保活機制?

    [18] 一文讀懂即時通訊應用中的網絡心跳包機制:作用、原理、實現思路等

    [19] 微信團隊原創分享:Android版微信后臺保活實戰分享(網絡保活篇)

    [20] 融云技術分享:融云安卓端IM產品的網絡鏈路保活技術實踐

    [21] 徹底搞懂TCP協議層的KeepAlive保活機制

    [22] 深度解密釘釘即時消息服務DTIM的技術設計

    (本文已同步發布于:http://www.52im.net/thread-4530-1-1.html



    作者:Jack Jiang (點擊作者姓名進入Github)
    出處:http://www.52im.net/space-uid-1.html
    交流:歡迎加入即時通訊開發交流群 215891622
    討論:http://www.52im.net/
    Jack Jiang同時是【原創Java Swing外觀工程BeautyEye】【輕量級移動端即時通訊框架MobileIMSDK】的作者,可前往下載交流。
    本博文 歡迎轉載,轉載請注明出處(也可前往 我的52im.net 找到我)。


    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    Jack Jiang的 Mail: jb2011@163.com, 聯系QQ: 413980957, 微信: hellojackjiang
    主站蜘蛛池模板: 久草免费手机视频| 亚洲一久久久久久久久| 亚洲精品成人片在线播放| 国产成人亚洲综合无码| 亚洲精品国产综合久久一线| 亚洲精品国产高清不卡在线| 亚洲精品视频免费| 亚洲精品乱码久久久久久| 亚洲V无码一区二区三区四区观看 亚洲αv久久久噜噜噜噜噜 | 久久久久亚洲AV无码专区体验| 亚洲激情在线视频| 亚洲综合久久1区2区3区| 亚洲伊人久久大香线蕉| 亚洲一区二区三区精品视频| 亚洲中文字幕乱码一区| 亚洲AV女人18毛片水真多| 免费福利资源站在线视频| 国产性生大片免费观看性| 无人在线观看免费高清| 国产四虎免费精品视频| 大学生高清一级毛片免费| 亚洲av无码成人精品区在线播放| 国产亚洲精品福利在线无卡一| 亚洲av午夜福利精品一区人妖| 亚洲欧洲国产精品久久| 亚洲国产精品无码久久| 国产免费区在线观看十分钟| 99re6热视频精品免费观看| 久久久久久国产精品免费免费| 免费国产成人午夜私人影视| 亚洲综合在线另类色区奇米| 久久99亚洲网美利坚合众国| 亚洲欧美综合精品成人导航| 一级免费黄色大片| 精品无码免费专区毛片| 日韩精品免费一区二区三区| 国产亚洲精品福利在线无卡一| 亚洲国产综合第一精品小说| 阿v免费在线观看| 午夜免费福利小电影| 永久中文字幕免费视频网站|