<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-159  評論-114  文章-7  trackbacks-0
    最近在funplus做游戲,進而研究了一個新型架構。

    之前做游戲都是自己使用java搭建架構,經過幾年的積累確實也達到了最初的設想,多進程,進程內多線程,無鎖,0延遲純jdbc寫庫。對于單服架構來說,已經趨近于極致。

    今年小游戲盛行,如海盜來了,瘋狂游戲那家公司,全部使用的都是go+mongodb實現的,因為go的語言級別支援高并發,這點是java無法比擬的。不過java開源項目多,有很多的高手鋪墊了超多的框架,比如vertx,akka都可以更加充分的釋放java的能力。就看使用者的認識水平了。

    本次選擇vertx,主要是其在網絡通訊這塊,對netty的包裝,加上自己的eventloop模型,使得響應web請求速度基本屬于前3的水平。

    netServer = vertx.createHttpServer(httpServerOptions);
            netServer.requestHandler();
            netServer.requestHandler(hs 
    -> {
                
    if (hs.path().equals("/ping")) {
                    hs.response().end(
    "pong");
                    
    return;
                }
                hs.response().close();
                
    return;
            });
            
            netServer.websocketHandler(ws 
    -> {
                
    if (!ws.path().equals("/" + wsname)) {
                    ws.reject();
                    
    return;
                }
                Player player 
    = new Player(ws, ws.binaryHandlerID());
                players.put(player.getConnId(), player);
                player.setServerUUID(gateAdress);
                
    //日志
                if (log.isDebugEnabled()) {
                    SocketAddress addrLocal 
    = ws.localAddress();
                    log.debug(
    "新建一個連接:連接ID={},  本地端口={}, 遠程地址={}", player.getconnId(), addrLocal.port(), ws.remoteAddress());
                }
                
    //有連接過來了
                ws.binaryMessageHandler(data -> {
                    
    int readableBytes = data.length();
                    
    if (readableBytes < IMessage.MIN_MESSAGE_LENGTH) {
                        
    return;
                    }
                    
    int len = data.getShort(0);
                    
    if (len > 64 * 1024) {
                        log.error(
    "conn:" + player.getId() + "  發送數據包過大:" + len);
                        
    return;
                    }
                    
    if (readableBytes < len) {
                        
    return;
                    }

                    CGMessage msg 
    = decode(data);
                    
    if (msg == nullreturn;
                    inputHandler(msg, player);
                });
                ws.exceptionHandler(e 
    -> {
                    
    if (e.getMessage().equals("WebSocket is closed")) {
    //                    player.disconnect();
                    }
                    
    //斷連的日志就不打印堆棧了
                    if (e.getMessage().contains("Player reset by peer"|| e.getMessage().contains("遠程主機強迫關閉了一個現有的連接")) {
                        log.error(
    "主動斷開:connId={},cause={}", player.getconnId(), e.getCause());
                    } 
    else {
                        
    //輸出錯誤日志
                        log.error("發生異常:connId={},cause={}", player.getconnId(), e.getCause());
                    }
                });
                ws.closeHandler(t 
    -> {
    //                if (player == null) return;
                    
    //連接狀態
                    
    //日志
                    if (log.isDebugEnabled()) {
                        log.debug(
    "連接關閉:connId={}, status={}", player.getconnId(), player == null ? "" : player.toString());
                    }
                    
    if (player.getState() == PlayerState.connected || player.getState() == PlayerState.init || player.getState() == PlayerState.logouting) {
                        player.setState(PlayerState.logouted);
                        
    //Remove掉 session connId = Player
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                        
    return;
                    }
                    
    if (player.getUserInfo() == null) {
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                        
    return;
                    }
                    gateService.closePlayer(player.getconnId(), ar 
    -> {
                        
    if (ar.failed()) {
                            Loggers.coreLogger.error(
    "player connId:" + player.getconnId() + " 離線退出異常!!!" + ar.cause().getMessage());
                        }
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                    });

                });
            }).listen(port, host, res 
    -> {
                
    if (res.succeeded()) {
                    
    //啟動日志信息
                    log.info(" started. Listen: " + port + "  vert:" + vertx.hashCode());
                    future.complete();
                }
            });
    vertx能方便的使用eventloop線程池響應玩家發來的請求,并永遠在特定線程進行代碼調用。

    比自己使用hash線程池靠譜很多。ps. 自己造輪子不是不好,主要實現方法不一定測試完整,有意想不到的情況,就要自己來趟坑。

    后面主要是說一下,但如果大規模請求MongoDB,需要更高的MongoDB響應要求。進而想到要加緩存機制,最初想到的是redis+mongodb,自己實現讀通過,寫通過。
    如果redis不存在,則從mongodb讀取,并放入緩存,寫數據先寫緩存,后寫mongodb。

    自己實現的這種存儲機制,比較low。所以繼續尋找緩存方案。

    過程中,發現了一個曝光率不高的框架,也就是Apache Ignite。最新一代數據網格。

    關鍵的一步,就是如果讓vertx與Ignite工作到一起。這是一個必要的條件。

    package cn.empires;

    import cn.empires.common.Globals;
    import cn.empires.common.contants.Loggers;
    import cn.empires.gs.support.observer.Event;
    import cn.empires.verticle.OnlineVerticle;
    import io.vertx.core.DeploymentOptions;
    import io.vertx.core.Launcher;
    import io.vertx.core.Vertx;
    import io.vertx.core.VertxOptions;
    import io.vertx.core.json.JsonObject;

    public class MainLaunch extends Launcher {

        
    private JsonObject config;
        
        
    public static void main(String[] args) {
            System.setProperty(
    "logFileName""gateServer");
            
    new MainLaunch().dispatch(args);
        }
        
        @Override
        
    protected String getDefaultCommand() {
            
    return super.getDefaultCommand();
        }
     
        @Override
        
    protected String getMainVerticle() {
            
    return "cn.empires.verticle.GateVerticle";
        }
        
        @Override
        
    public void afterConfigParsed(JsonObject config) {
            
    super.afterConfigParsed(config);
            
    this.config = config;
        }
        
        @Override
        
    public void beforeStartingVertx(VertxOptions options) {
            options.setClustered(
    true);
        }
        
        @Override
        
    public void afterStartingVertx(Vertx vertx) {
            
    super.afterStartingVertx(vertx);
            
    //config.put("redis.password", "123456");
            
    //初始化全局相關信息
            ListenerInit.init(Event.instance);
            Loggers.coreLogger.info(
    "Globals init .");
            Globals.init(vertx, config);
            vertx.deployVerticle(OnlineVerticle.
    classnew DeploymentOptions().setConfig(config));
        }
        
        @Override
        
    public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
            
    super.beforeDeployingVerticle(deploymentOptions);
        }
        
        @Override
        
    public void beforeStoppingVertx(Vertx vertx) {
            
    super.beforeStoppingVertx(vertx);
        }
        
        @Override
        
    public void afterStoppingVertx() {
            
    super.afterStoppingVertx();
        }
        
        @Override
        
    public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
            
    super.handleDeployFailed(vertx, mainVerticle, deploymentOptions, cause);
        }
        
    }

    如果想使用Ignite的緩存,必須需要Ignite實例對象。否則無法獲取。
    if (ignite == null) {
         ClusterManager clusterManager 
    = ((VertxInternal) vertx).getClusterManager();
         String uuid 
    = clusterManager.getNodeID();
         ignite 
    = Ignition.ignite(UUID.fromString(uuid));
    }

    在classpath中,配置一個ignite.xml,vertx啟動的時候自動會加載ignite.xml,然后使用IgniteManager進行集群管理。
    我只貼一遍ignite.xml配置
    <?xml version="1.0" encoding="UTF-8"?>

    <!--
      Licensed to the Apache Software Foundation (ASF) under one or more
      contributor license agreements.  See the NOTICE file distributed with
      this work for additional information regarding copyright ownership.
      The ASF licenses this file to You under the Apache License, Version 2.0
      (the "License"); you may not use this file except in compliance with
      the License.  You may obtain a copy of the License at

           http://www.apache.org/licenses/LICENSE-2.0

      Unless required by applicable law or agreed to in writing, software
      distributed under the License is distributed on an "AS IS" BASIS,
      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      See the License for the specific language governing permissions and
      limitations under the License.
    -->

    <!--
        Ignite Spring configuration file to startup Ignite cache.

        This file demonstrates how to configure cache using Spring. Provided cache
        will be created on node startup.

        Use this configuration file when running HTTP REST examples (see 'examples/rest' folder).

        When starting a standalone node, you need to execute the following command:
        {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml

        When starting Ignite from Java IDE, pass path to this file to Ignition:
        Ignition.start("examples/config/example-cache.xml");
    -->
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi
    ="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation
    ="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd"
    >
        
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
            
    <property name="dataStorageConfiguration">
                
    <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                    
    <!-- Set the page size to 4 KB -->
                      
    <property name="pageSize" value="4096"/>
                    
    <!-- Set concurrency level -->
                      
    <property name="concurrencyLevel" value="6"/>
                      
    <property name="systemRegionInitialSize" value="#{40 * 1024 * 1024}"/>
                      
    <property name="systemRegionMaxSize" value="#{80 * 1024 * 1024}"/>
                      
    <property name="defaultDataRegionConfiguration">
                        
    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            
    <property name="name" value="Default_Region"/>
                            
    <!-- 設置默認內存區最大內存為 512M. -->
                            
    <property name="maxSize" value="#{512L * 1024 * 1024}"/>
                            
    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                
    <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                        
    </bean>
                    
    </property>
                   
    <property name="dataRegionConfigurations">
                        
    <list>
                          
    <!--
                              Defining a data region that will consume up to 500 MB of RAM and 
                              will have eviction and persistence enabled.
                          
    -->
                          
    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            
    <!-- Custom region name. -->
                            
    <property name="name" value="500MB_Region"/>
                
                            
    <!-- 100 MB initial size. -->
                            
    <property name="initialSize" value="#{100L * 1024 * 1024}"/>
                
                            
    <!-- 500 MB maximum size. -->
                            
    <property name="maxSize" value="#{500L * 1024 * 1024}"/>
                            
                            
    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                
    <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                          
    </bean>
                        
    </list>
                    
    </property>
                
    </bean>
            
    </property>
            
    <property name="cacheConfiguration">
                
    <list>
                       
    <bean class="org.apache.ignite.configuration.CacheConfiguration">
                               
    <property name="name" value="UserInfo"/>
                               
    <property name="cacheMode" value="PARTITIONED"/>
                               
    <property name="atomicityMode" value="ATOMIC"/>
                               
    <property name="backups" value="0"/>
                               
    <property name="cacheStoreFactory">
                                   
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                                       
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
                                   
    </bean>
                               
    </property>
                               
    <property name="readThrough" value="true"/>
                               
    <property name="writeThrough" value="true"/>
                               
    <property name="writeBehindEnabled" value="true"/>
                               
    <property name="writeBehindFlushSize" value="1024"/>
                               
    <property name="writeBehindFlushFrequency" value="5"/>
                               
    <property name="writeBehindFlushThreadCount" value="1"/>
                               
    <property name="writeBehindBatchSize" value="512"/>
                               
    <property name="dataRegionName" value="Default_Region"/>
                    
    </bean>
                
    </list>
            
    </property>
            
    <property name="failureDetectionTimeout" value="60000"/>
            
    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
            
    <property name="discoverySpi">
                
    <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                    
    <property name="ipFinder">
                        
    <!--
                            Ignite provides several options for automatic discovery that can be used
                            instead os static IP based discovery. For information on all options refer
                            to our documentation: http://apacheignite.readme.io/docs/cluster-config
                        
    -->
                        
    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                        
    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        
    <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
                            
    <property name="addresses">
                                
    <list>
                                    
    <!-- In distributed environment, replace with actual host IP address. -->
                                    
    <value>127.0.0.1:47500..47509</value>
                                
    </list>
                            
    </property>
                        
    </bean>
                    
    </property>
                
    </bean>
            
    </property>
        
    </bean>
    </beans>

    Ignite 對內存有細致的劃分,可以分多個區域Region,每個區域有自己的配置,比如設置初始大小和最大大小,以及淘汰策略。
    UserInfo對應的CacheConfigurationCache使用進行了配置,比如readThrough writeThrough writeBehindEnabled等等,細致的配置諸如后寫刷新頻率writeBehindFlushFrequency為5,表示5秒才會刷新一次更新數據。

        public static <T> IgniteCache<String, T> createIgniteCache(String cacheName, Class<? extends CacheStoreAdapter<String, T>> clazz) {
            CacheConfiguration
    <String, T> cacheCfg = new CacheConfiguration<>(cacheName);
            
    return Globals.ignite().getOrCreateCache(cacheCfg);
        }

    在Globals工具類,提供工具方法獲得IgniteCache對象。

    package cn.empires.gs.player.service.impl;

    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.lang.IgniteFuture;

    import cn.empires.common.Globals;
    import cn.empires.common.cache.UserCacheStore;
    import cn.empires.common.service.ServiceBase;
    import cn.empires.gs.model.UserInfo;
    import cn.empires.gs.player.service.UserService;
    import io.vertx.core.AsyncResult;
    import io.vertx.core.Future;
    import io.vertx.core.Handler;
    import io.vertx.core.Vertx;
    import io.vertx.core.json.JsonObject;

    public class UserServiceImpl extends ServiceBase implements UserService {
        
        
    private final IgniteCache<String, UserInfo> cache;

        
    public UserServiceImpl(Vertx vertx, JsonObject config) {
            
    super(vertx, config);
            cache 
    = Globals.createIgniteCache(UserInfo.tableName, UserCacheStore.class);
        }

        @Override
        
    public UserService getUserInfo(String id, Handler<AsyncResult<UserInfo>> handler) {
            IgniteFuture
    <UserInfo> future = cache.getAsync(id);
            future.listen(h 
    -> {
                
    if(h.isDone()) {
                    handler.handle(Future.succeededFuture(h.get()));
                }
            });        
            
    return this;
        }
        

        @Override
        
    public UserService saveUserInfo(UserInfo userInfo, Handler<AsyncResult<UserInfo>> handler) {
            IgniteFuture
    <Void> future = cache.putAsync(userInfo.get_id(), userInfo);
            future.listen(h 
    -> {
                
    if(h.isDone()) {
                    handler.handle(Future.succeededFuture(userInfo));
                }
            });
            
    return this;
        }

    }

    最后一件事,就是同步寫庫,可以讀通過從MongoDB進行讀取。

    package cn.empires.common.cache;

    import java.util.ArrayList;
    import java.util.List;

    import javax.cache.Cache.Entry;
    import javax.cache.integration.CacheLoaderException;
    import javax.cache.integration.CacheWriterException;

    import org.apache.ignite.IgniteException;
    import org.apache.ignite.cache.store.CacheStoreAdapter;
    import org.apache.ignite.lifecycle.LifecycleAware;
    import org.bson.Document;

    import com.mongodb.Block;
    import com.mongodb.client.FindIterable;
    import com.mongodb.client.MongoCollection;
    import com.mongodb.client.model.Filters;
    import com.mongodb.client.model.UpdateOptions;

    import cn.empires.common.Globals;
    import cn.empires.common.contants.Loggers;
    import cn.empires.gs.model.UserInfo;
    import io.vertx.core.json.JsonObject;

    public class UserCacheStore extends CacheStoreAdapter<String, UserInfo> implements LifecycleAware {
        
        
    /** Mongo collection. */
        
    private MongoCollection<Document> collection;
        
        @Override
        
    public void start() throws IgniteException {
        }

        @Override
        
    public UserInfo load(String key) throws CacheLoaderException {
            
    if(collection == null) {
                collection 
    = Globals.mongoDb().getCollection(UserInfo.tableName);
            }
            FindIterable
    <Document> iter = collection.find(Filters.eq("_id", key));
            
    final List<JsonObject> result = new ArrayList<>(1);
            iter.forEach(
    new Block<Document>() {
                
    public void apply(Document _doc) {
                    result.add(
    new JsonObject(_doc.toJson()));
                }
            });
            
    if(result != null && !result.isEmpty()) {
                Loggers.userLogger.info(
    "CacheStore load UserInfo.");
                JsonObject jsonObj 
    = result.get(0);
                
    return UserInfo.fromDB(jsonObj);
            }
            
    return null;
        }

        @Override
        
    public void write(Entry<? extends String, ? extends UserInfo> entry) throws CacheWriterException {
            
    if(collection == null) {
                collection 
    = Globals.mongoDb().getCollection(UserInfo.tableName);
            }
            Document filter 
    = new Document();
            filter.append(
    "_id", entry.getKey());
            
            Document replacement 
    = new Document();
            replacement.append(
    "value", entry.getValue().toString());
            collection.replaceOne(filter, replacement, 
    new UpdateOptions().upsert(true));
            Loggers.userLogger.info(
    "CacheStore saved UserInfo.");
        }

        @Override
        
    public void delete(Object key) throws CacheWriterException {
            
        }



        @Override
        
    public void stop() throws IgniteException {
            
        }

    }

    由于在ignite.xml中進行了配置
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
        
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
    </bean>
    ,所以在使用Cache獲取UserInfo的時候,如果不存在對應的信息,就會從MongoDB讀取。

    更多的信息只能下一篇文章再寫了。有問題可以留言。

    posted on 2018-11-13 14:29 北國狼人的BloG 閱讀(1597) 評論(0)  編輯  收藏

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 亚洲XX00视频| 国产午夜亚洲精品不卡| 亚洲人成网站在线观看播放| 免费看www视频| aⅴ在线免费观看| 99久久久国产精品免费蜜臀| 无码AV动漫精品一区二区免费| 亚洲AV成人影视在线观看 | 牛牛在线精品观看免费正 | 777爽死你无码免费看一二区| 72pao国产成视频永久免费| 亚洲AV无码专区在线观看成人| 亚洲乱码中文论理电影| 亚洲欧洲一区二区| 亚洲人成影院在线无码按摩店| 国产成人亚洲精品影院| 亚洲国产成人久久综合一区77| 性做久久久久免费看| 日韩中文无码有码免费视频| 最近最好的中文字幕2019免费 | 亚洲婷婷在线视频| 久久久无码精品亚洲日韩按摩| 亚洲成a人片在线观看无码专区| 亚洲日韩人妻第一页| 亚洲情侣偷拍精品| 亚洲五月午夜免费在线视频| 亚洲国产专区一区| 国产偷窥女洗浴在线观看亚洲 | 一级毛片成人免费看免费不卡| 最近中文字幕大全免费版在线| 韩国免费A级毛片久久| a在线视频免费观看| 国产精品视频白浆免费视频| 一级毛片免费毛片一级毛片免费| 日本卡1卡2卡三卡免费| 免费A级毛片无码A∨免费| 1024免费福利永久观看网站| 三年片在线观看免费观看高清电影| 亚洲第一成年免费网站| 好吊妞视频免费视频| 全部免费毛片在线|