<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-159  評論-114  文章-7  trackbacks-0
    最近在funplus做游戲,進而研究了一個新型架構。

    之前做游戲都是自己使用java搭建架構,經過幾年的積累確實也達到了最初的設想,多進程,進程內多線程,無鎖,0延遲純jdbc寫庫。對于單服架構來說,已經趨近于極致。

    今年小游戲盛行,如海盜來了,瘋狂游戲那家公司,全部使用的都是go+mongodb實現的,因為go的語言級別支援高并發(fā),這點是java無法比擬的。不過java開源項目多,有很多的高手鋪墊了超多的框架,比如vertx,akka都可以更加充分的釋放java的能力。就看使用者的認識水平了。

    本次選擇vertx,主要是其在網絡通訊這塊,對netty的包裝,加上自己的eventloop模型,使得響應web請求速度基本屬于前3的水平。

    netServer = vertx.createHttpServer(httpServerOptions);
            netServer.requestHandler();
            netServer.requestHandler(hs 
    -> {
                
    if (hs.path().equals("/ping")) {
                    hs.response().end(
    "pong");
                    
    return;
                }
                hs.response().close();
                
    return;
            });
            
            netServer.websocketHandler(ws 
    -> {
                
    if (!ws.path().equals("/" + wsname)) {
                    ws.reject();
                    
    return;
                }
                Player player 
    = new Player(ws, ws.binaryHandlerID());
                players.put(player.getConnId(), player);
                player.setServerUUID(gateAdress);
                
    //日志
                if (log.isDebugEnabled()) {
                    SocketAddress addrLocal 
    = ws.localAddress();
                    log.debug(
    "新建一個連接:連接ID={},  本地端口={}, 遠程地址={}", player.getconnId(), addrLocal.port(), ws.remoteAddress());
                }
                
    //有連接過來了
                ws.binaryMessageHandler(data -> {
                    
    int readableBytes = data.length();
                    
    if (readableBytes < IMessage.MIN_MESSAGE_LENGTH) {
                        
    return;
                    }
                    
    int len = data.getShort(0);
                    
    if (len > 64 * 1024) {
                        log.error(
    "conn:" + player.getId() + "  發(fā)送數據包過大:" + len);
                        
    return;
                    }
                    
    if (readableBytes < len) {
                        
    return;
                    }

                    CGMessage msg 
    = decode(data);
                    
    if (msg == nullreturn;
                    inputHandler(msg, player);
                });
                ws.exceptionHandler(e 
    -> {
                    
    if (e.getMessage().equals("WebSocket is closed")) {
    //                    player.disconnect();
                    }
                    
    //斷連的日志就不打印堆棧了
                    if (e.getMessage().contains("Player reset by peer"|| e.getMessage().contains("遠程主機強迫關閉了一個現有的連接")) {
                        log.error(
    "主動斷開:connId={},cause={}", player.getconnId(), e.getCause());
                    } 
    else {
                        
    //輸出錯誤日志
                        log.error("發(fā)生異常:connId={},cause={}", player.getconnId(), e.getCause());
                    }
                });
                ws.closeHandler(t 
    -> {
    //                if (player == null) return;
                    
    //連接狀態(tài)
                    
    //日志
                    if (log.isDebugEnabled()) {
                        log.debug(
    "連接關閉:connId={}, status={}", player.getconnId(), player == null ? "" : player.toString());
                    }
                    
    if (player.getState() == PlayerState.connected || player.getState() == PlayerState.init || player.getState() == PlayerState.logouting) {
                        player.setState(PlayerState.logouted);
                        
    //Remove掉 session connId = Player
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                        
    return;
                    }
                    
    if (player.getUserInfo() == null) {
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                        
    return;
                    }
                    gateService.closePlayer(player.getconnId(), ar 
    -> {
                        
    if (ar.failed()) {
                            Loggers.coreLogger.error(
    "player connId:" + player.getconnId() + " 離線退出異常!!!" + ar.cause().getMessage());
                        }
                        
    //刪掉連接對應的player
                        players.remove(player.getConnId());
                    });

                });
            }).listen(port, host, res 
    -> {
                
    if (res.succeeded()) {
                    
    //啟動日志信息
                    log.info(" started. Listen: " + port + "  vert:" + vertx.hashCode());
                    future.complete();
                }
            });
    vertx能方便的使用eventloop線程池響應玩家發(fā)來的請求,并永遠在特定線程進行代碼調用。

    比自己使用hash線程池靠譜很多。ps. 自己造輪子不是不好,主要實現方法不一定測試完整,有意想不到的情況,就要自己來趟坑。

    后面主要是說一下,但如果大規(guī)模請求MongoDB,需要更高的MongoDB響應要求。進而想到要加緩存機制,最初想到的是redis+mongodb,自己實現讀通過,寫通過。
    如果redis不存在,則從mongodb讀取,并放入緩存,寫數據先寫緩存,后寫mongodb。

    自己實現的這種存儲機制,比較low。所以繼續(xù)尋找緩存方案。

    過程中,發(fā)現了一個曝光率不高的框架,也就是Apache Ignite。最新一代數據網格。

    關鍵的一步,就是如果讓vertx與Ignite工作到一起。這是一個必要的條件。

    package cn.empires;

    import cn.empires.common.Globals;
    import cn.empires.common.contants.Loggers;
    import cn.empires.gs.support.observer.Event;
    import cn.empires.verticle.OnlineVerticle;
    import io.vertx.core.DeploymentOptions;
    import io.vertx.core.Launcher;
    import io.vertx.core.Vertx;
    import io.vertx.core.VertxOptions;
    import io.vertx.core.json.JsonObject;

    public class MainLaunch extends Launcher {

        
    private JsonObject config;
        
        
    public static void main(String[] args) {
            System.setProperty(
    "logFileName""gateServer");
            
    new MainLaunch().dispatch(args);
        }
        
        @Override
        
    protected String getDefaultCommand() {
            
    return super.getDefaultCommand();
        }
     
        @Override
        
    protected String getMainVerticle() {
            
    return "cn.empires.verticle.GateVerticle";
        }
        
        @Override
        
    public void afterConfigParsed(JsonObject config) {
            
    super.afterConfigParsed(config);
            
    this.config = config;
        }
        
        @Override
        
    public void beforeStartingVertx(VertxOptions options) {
            options.setClustered(
    true);
        }
        
        @Override
        
    public void afterStartingVertx(Vertx vertx) {
            
    super.afterStartingVertx(vertx);
            
    //config.put("redis.password", "123456");
            
    //初始化全局相關信息
            ListenerInit.init(Event.instance);
            Loggers.coreLogger.info(
    "Globals init .");
            Globals.init(vertx, config);
            vertx.deployVerticle(OnlineVerticle.
    classnew DeploymentOptions().setConfig(config));
        }
        
        @Override
        
    public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
            
    super.beforeDeployingVerticle(deploymentOptions);
        }
        
        @Override
        
    public void beforeStoppingVertx(Vertx vertx) {
            
    super.beforeStoppingVertx(vertx);
        }
        
        @Override
        
    public void afterStoppingVertx() {
            
    super.afterStoppingVertx();
        }
        
        @Override
        
    public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
            
    super.handleDeployFailed(vertx, mainVerticle, deploymentOptions, cause);
        }
        
    }

    如果想使用Ignite的緩存,必須需要Ignite實例對象。否則無法獲取。
    if (ignite == null) {
         ClusterManager clusterManager 
    = ((VertxInternal) vertx).getClusterManager();
         String uuid 
    = clusterManager.getNodeID();
         ignite 
    = Ignition.ignite(UUID.fromString(uuid));
    }

    在classpath中,配置一個ignite.xml,vertx啟動的時候自動會加載ignite.xml,然后使用IgniteManager進行集群管理。
    我只貼一遍ignite.xml配置
    <?xml version="1.0" encoding="UTF-8"?>

    <!--
      Licensed to the Apache Software Foundation (ASF) under one or more
      contributor license agreements.  See the NOTICE file distributed with
      this work for additional information regarding copyright ownership.
      The ASF licenses this file to You under the Apache License, Version 2.0
      (the "License"); you may not use this file except in compliance with
      the License.  You may obtain a copy of the License at

           http://www.apache.org/licenses/LICENSE-2.0

      Unless required by applicable law or agreed to in writing, software
      distributed under the License is distributed on an "AS IS" BASIS,
      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      See the License for the specific language governing permissions and
      limitations under the License.
    -->

    <!--
        Ignite Spring configuration file to startup Ignite cache.

        This file demonstrates how to configure cache using Spring. Provided cache
        will be created on node startup.

        Use this configuration file when running HTTP REST examples (see 'examples/rest' folder).

        When starting a standalone node, you need to execute the following command:
        {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml

        When starting Ignite from Java IDE, pass path to this file to Ignition:
        Ignition.start("examples/config/example-cache.xml");
    -->
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi
    ="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation
    ="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd"
    >
        
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
            
    <property name="dataStorageConfiguration">
                
    <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                    
    <!-- Set the page size to 4 KB -->
                      
    <property name="pageSize" value="4096"/>
                    
    <!-- Set concurrency level -->
                      
    <property name="concurrencyLevel" value="6"/>
                      
    <property name="systemRegionInitialSize" value="#{40 * 1024 * 1024}"/>
                      
    <property name="systemRegionMaxSize" value="#{80 * 1024 * 1024}"/>
                      
    <property name="defaultDataRegionConfiguration">
                        
    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            
    <property name="name" value="Default_Region"/>
                            
    <!-- 設置默認內存區(qū)最大內存為 512M. -->
                            
    <property name="maxSize" value="#{512L * 1024 * 1024}"/>
                            
    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                
    <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                        
    </bean>
                    
    </property>
                   
    <property name="dataRegionConfigurations">
                        
    <list>
                          
    <!--
                              Defining a data region that will consume up to 500 MB of RAM and 
                              will have eviction and persistence enabled.
                          
    -->
                          
    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            
    <!-- Custom region name. -->
                            
    <property name="name" value="500MB_Region"/>
                
                            
    <!-- 100 MB initial size. -->
                            
    <property name="initialSize" value="#{100L * 1024 * 1024}"/>
                
                            
    <!-- 500 MB maximum size. -->
                            
    <property name="maxSize" value="#{500L * 1024 * 1024}"/>
                            
                            
    <!-- Enabling RANDOM_LRU eviction for this region.  -->
                                
    <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                          
    </bean>
                        
    </list>
                    
    </property>
                
    </bean>
            
    </property>
            
    <property name="cacheConfiguration">
                
    <list>
                       
    <bean class="org.apache.ignite.configuration.CacheConfiguration">
                               
    <property name="name" value="UserInfo"/>
                               
    <property name="cacheMode" value="PARTITIONED"/>
                               
    <property name="atomicityMode" value="ATOMIC"/>
                               
    <property name="backups" value="0"/>
                               
    <property name="cacheStoreFactory">
                                   
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                                       
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
                                   
    </bean>
                               
    </property>
                               
    <property name="readThrough" value="true"/>
                               
    <property name="writeThrough" value="true"/>
                               
    <property name="writeBehindEnabled" value="true"/>
                               
    <property name="writeBehindFlushSize" value="1024"/>
                               
    <property name="writeBehindFlushFrequency" value="5"/>
                               
    <property name="writeBehindFlushThreadCount" value="1"/>
                               
    <property name="writeBehindBatchSize" value="512"/>
                               
    <property name="dataRegionName" value="Default_Region"/>
                    
    </bean>
                
    </list>
            
    </property>
            
    <property name="failureDetectionTimeout" value="60000"/>
            
    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
            
    <property name="discoverySpi">
                
    <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                    
    <property name="ipFinder">
                        
    <!--
                            Ignite provides several options for automatic discovery that can be used
                            instead os static IP based discovery. For information on all options refer
                            to our documentation: http://apacheignite.readme.io/docs/cluster-config
                        
    -->
                        
    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                        
    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        
    <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
                            
    <property name="addresses">
                                
    <list>
                                    
    <!-- In distributed environment, replace with actual host IP address. -->
                                    
    <value>127.0.0.1:47500..47509</value>
                                
    </list>
                            
    </property>
                        
    </bean>
                    
    </property>
                
    </bean>
            
    </property>
        
    </bean>
    </beans>

    Ignite 對內存有細致的劃分,可以分多個區(qū)域Region,每個區(qū)域有自己的配置,比如設置初始大小和最大大小,以及淘汰策略。
    UserInfo對應的CacheConfigurationCache使用進行了配置,比如readThrough writeThrough writeBehindEnabled等等,細致的配置諸如后寫刷新頻率writeBehindFlushFrequency為5,表示5秒才會刷新一次更新數據。

        public static <T> IgniteCache<String, T> createIgniteCache(String cacheName, Class<? extends CacheStoreAdapter<String, T>> clazz) {
            CacheConfiguration
    <String, T> cacheCfg = new CacheConfiguration<>(cacheName);
            
    return Globals.ignite().getOrCreateCache(cacheCfg);
        }

    在Globals工具類,提供工具方法獲得IgniteCache對象。

    package cn.empires.gs.player.service.impl;

    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.lang.IgniteFuture;

    import cn.empires.common.Globals;
    import cn.empires.common.cache.UserCacheStore;
    import cn.empires.common.service.ServiceBase;
    import cn.empires.gs.model.UserInfo;
    import cn.empires.gs.player.service.UserService;
    import io.vertx.core.AsyncResult;
    import io.vertx.core.Future;
    import io.vertx.core.Handler;
    import io.vertx.core.Vertx;
    import io.vertx.core.json.JsonObject;

    public class UserServiceImpl extends ServiceBase implements UserService {
        
        
    private final IgniteCache<String, UserInfo> cache;

        
    public UserServiceImpl(Vertx vertx, JsonObject config) {
            
    super(vertx, config);
            cache 
    = Globals.createIgniteCache(UserInfo.tableName, UserCacheStore.class);
        }

        @Override
        
    public UserService getUserInfo(String id, Handler<AsyncResult<UserInfo>> handler) {
            IgniteFuture
    <UserInfo> future = cache.getAsync(id);
            future.listen(h 
    -> {
                
    if(h.isDone()) {
                    handler.handle(Future.succeededFuture(h.get()));
                }
            });        
            
    return this;
        }
        

        @Override
        
    public UserService saveUserInfo(UserInfo userInfo, Handler<AsyncResult<UserInfo>> handler) {
            IgniteFuture
    <Void> future = cache.putAsync(userInfo.get_id(), userInfo);
            future.listen(h 
    -> {
                
    if(h.isDone()) {
                    handler.handle(Future.succeededFuture(userInfo));
                }
            });
            
    return this;
        }

    }

    最后一件事,就是同步寫庫,可以讀通過從MongoDB進行讀取。

    package cn.empires.common.cache;

    import java.util.ArrayList;
    import java.util.List;

    import javax.cache.Cache.Entry;
    import javax.cache.integration.CacheLoaderException;
    import javax.cache.integration.CacheWriterException;

    import org.apache.ignite.IgniteException;
    import org.apache.ignite.cache.store.CacheStoreAdapter;
    import org.apache.ignite.lifecycle.LifecycleAware;
    import org.bson.Document;

    import com.mongodb.Block;
    import com.mongodb.client.FindIterable;
    import com.mongodb.client.MongoCollection;
    import com.mongodb.client.model.Filters;
    import com.mongodb.client.model.UpdateOptions;

    import cn.empires.common.Globals;
    import cn.empires.common.contants.Loggers;
    import cn.empires.gs.model.UserInfo;
    import io.vertx.core.json.JsonObject;

    public class UserCacheStore extends CacheStoreAdapter<String, UserInfo> implements LifecycleAware {
        
        
    /** Mongo collection. */
        
    private MongoCollection<Document> collection;
        
        @Override
        
    public void start() throws IgniteException {
        }

        @Override
        
    public UserInfo load(String key) throws CacheLoaderException {
            
    if(collection == null) {
                collection 
    = Globals.mongoDb().getCollection(UserInfo.tableName);
            }
            FindIterable
    <Document> iter = collection.find(Filters.eq("_id", key));
            
    final List<JsonObject> result = new ArrayList<>(1);
            iter.forEach(
    new Block<Document>() {
                
    public void apply(Document _doc) {
                    result.add(
    new JsonObject(_doc.toJson()));
                }
            });
            
    if(result != null && !result.isEmpty()) {
                Loggers.userLogger.info(
    "CacheStore load UserInfo.");
                JsonObject jsonObj 
    = result.get(0);
                
    return UserInfo.fromDB(jsonObj);
            }
            
    return null;
        }

        @Override
        
    public void write(Entry<? extends String, ? extends UserInfo> entry) throws CacheWriterException {
            
    if(collection == null) {
                collection 
    = Globals.mongoDb().getCollection(UserInfo.tableName);
            }
            Document filter 
    = new Document();
            filter.append(
    "_id", entry.getKey());
            
            Document replacement 
    = new Document();
            replacement.append(
    "value", entry.getValue().toString());
            collection.replaceOne(filter, replacement, 
    new UpdateOptions().upsert(true));
            Loggers.userLogger.info(
    "CacheStore saved UserInfo.");
        }

        @Override
        
    public void delete(Object key) throws CacheWriterException {
            
        }



        @Override
        
    public void stop() throws IgniteException {
            
        }

    }

    由于在ignite.xml中進行了配置
    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
        
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
    </bean>
    ,所以在使用Cache獲取UserInfo的時候,如果不存在對應的信息,就會從MongoDB讀取。

    更多的信息只能下一篇文章再寫了。有問題可以留言。

    posted on 2018-11-13 14:29 北國狼人的BloG 閱讀(1598) 評論(0)  編輯  收藏

    只有注冊用戶登錄后才能發(fā)表評論。


    網站導航:
     
    主站蜘蛛池模板: 黄色大片免费网站| 亚洲小说区图片区另类春色| 日本一道本不卡免费 | 国产精品亚洲综合一区在线观看| 亚洲av成人无码久久精品| 亚洲A∨精品一区二区三区| 国内精自视频品线六区免费| a毛片免费全部在线播放**| 在线播放亚洲精品| 久久亚洲最大成人网4438| 久久亚洲美女精品国产精品| 亚洲中文字幕无码日韩| 亚洲国产精品专区在线观看| 日韩精品视频免费在线观看| 美女视频黄的全免费视频网站| 久久精品人成免费| 99热这里只有精品6免费| a级成人免费毛片完整版| 国产免费久久精品99久久| 久青草国产免费观看| 思思久久99热免费精品6| 看免费毛片天天看| 国产成人高清亚洲一区久久| 亚洲精品人成网线在线播放va | 国产精品免费AV片在线观看| 中文字幕在线免费播放| 皇色在线免费视频| 久久精品无码专区免费| 国产在线精品一区免费香蕉| 国产精品午夜免费观看网站 | 亚洲成AV人片在| 亚洲人成色77777| 亚洲国产另类久久久精品小说| 国产成人A亚洲精V品无码| 亚洲色欲色欲www在线丝| 亚洲理论电影在线观看| 亚洲国产成人一区二区三区| 久久丫精品国产亚洲av| 亚洲日韩在线视频| 亚洲日韩国产二区无码 | 无码国模国产在线观看免费|