最近在funplus做游戲,進而研究了一個新型架構。
之前做游戲都是自己使用java搭建架構,經過幾年的積累確實也達到了最初的設想,多進程,進程內多線程,無鎖,0延遲純jdbc寫庫。對于單服架構來說,已經趨近于極致。
今年小游戲盛行,如海盜來了,瘋狂游戲那家公司,全部使用的都是go+mongodb實現的,因為go的語言級別支援高并發,這點是java無法比擬的。不過java開源項目多,有很多的高手鋪墊了超多的框架,比如vertx,akka都可以更加充分的釋放java的能力。就看使用者的認識水平了。
本次選擇vertx,主要是其在網絡通訊這塊,對netty的包裝,加上自己的eventloop模型,使得響應web請求速度基本屬于前3的水平。
netServer = vertx.createHttpServer(httpServerOptions);
netServer.requestHandler();
netServer.requestHandler(hs -> {
if (hs.path().equals("/ping")) {
hs.response().end("pong");
return;
}
hs.response().close();
return;
});
netServer.websocketHandler(ws -> {
if (!ws.path().equals("/" + wsname)) {
ws.reject();
return;
}
Player player = new Player(ws, ws.binaryHandlerID());
players.put(player.getConnId(), player);
player.setServerUUID(gateAdress);
//日志
if (log.isDebugEnabled()) {
SocketAddress addrLocal = ws.localAddress();
log.debug("新建一個連接:連接ID={}, 本地端口={}, 遠程地址={}", player.getconnId(), addrLocal.port(), ws.remoteAddress());
}
//有連接過來了
ws.binaryMessageHandler(data -> {
int readableBytes = data.length();
if (readableBytes < IMessage.MIN_MESSAGE_LENGTH) {
return;
}
int len = data.getShort(0);
if (len > 64 * 1024) {
log.error("conn:" + player.getId() + " 發送數據包過大:" + len);
return;
}
if (readableBytes < len) {
return;
}
CGMessage msg = decode(data);
if (msg == null) return;
inputHandler(msg, player);
});
ws.exceptionHandler(e -> {
if (e.getMessage().equals("WebSocket is closed")) {
// player.disconnect();
}
//斷連的日志就不打印堆棧了
if (e.getMessage().contains("Player reset by peer") || e.getMessage().contains("遠程主機強迫關閉了一個現有的連接")) {
log.error("主動斷開:connId={},cause={}", player.getconnId(), e.getCause());
} else {
//輸出錯誤日志
log.error("發生異常:connId={},cause={}", player.getconnId(), e.getCause());
}
});
ws.closeHandler(t -> {
// if (player == null) return;
//連接狀態
//日志
if (log.isDebugEnabled()) {
log.debug("連接關閉:connId={}, status={}", player.getconnId(), player == null ? "" : player.toString());
}
if (player.getState() == PlayerState.connected || player.getState() == PlayerState.init || player.getState() == PlayerState.logouting) {
player.setState(PlayerState.logouted);
//Remove掉 session connId = Player
//刪掉連接對應的player
players.remove(player.getConnId());
return;
}
if (player.getUserInfo() == null) {
//刪掉連接對應的player
players.remove(player.getConnId());
return;
}
gateService.closePlayer(player.getconnId(), ar -> {
if (ar.failed()) {
Loggers.coreLogger.error("player connId:" + player.getconnId() + " 離線退出異常!!!" + ar.cause().getMessage());
}
//刪掉連接對應的player
players.remove(player.getConnId());
});
});
}).listen(port, host, res -> {
if (res.succeeded()) {
//啟動日志信息
log.info(" started. Listen: " + port + " vert:" + vertx.hashCode());
future.complete();
}
});
vertx能方便的使用eventloop線程池響應玩家發來的請求,并永遠在特定線程進行代碼調用。
比自己使用hash線程池靠譜很多。ps. 自己造輪子不是不好,主要實現方法不一定測試完整,有意想不到的情況,就要自己來趟坑。
后面主要是說一下,但如果大規模請求MongoDB,需要更高的MongoDB響應要求。進而想到要加緩存機制,最初想到的是redis+mongodb,自己實現讀通過,寫通過。
如果redis不存在,則從mongodb讀取,并放入緩存,寫數據先寫緩存,后寫mongodb。
自己實現的這種存儲機制,比較low。所以繼續尋找緩存方案。
過程中,發現了一個曝光率不高的框架,也就是Apache Ignite。最新一代數據網格。
關鍵的一步,就是如果讓vertx與Ignite工作到一起。這是一個必要的條件。
package cn.empires;
import cn.empires.common.Globals;
import cn.empires.common.contants.Loggers;
import cn.empires.gs.support.observer.Event;
import cn.empires.verticle.OnlineVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
public class MainLaunch extends Launcher {
private JsonObject config;
public static void main(String[] args) {
System.setProperty("logFileName", "gateServer");
new MainLaunch().dispatch(args);
}
@Override
protected String getDefaultCommand() {
return super.getDefaultCommand();
}
@Override
protected String getMainVerticle() {
return "cn.empires.verticle.GateVerticle";
}
@Override
public void afterConfigParsed(JsonObject config) {
super.afterConfigParsed(config);
this.config = config;
}
@Override
public void beforeStartingVertx(VertxOptions options) {
options.setClustered(true);
}
@Override
public void afterStartingVertx(Vertx vertx) {
super.afterStartingVertx(vertx);
//config.put("redis.password", "123456");
//初始化全局相關信息
ListenerInit.init(Event.instance);
Loggers.coreLogger.info("Globals init
.");
Globals.init(vertx, config);
vertx.deployVerticle(OnlineVerticle.class, new DeploymentOptions().setConfig(config));
}
@Override
public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
super.beforeDeployingVerticle(deploymentOptions);
}
@Override
public void beforeStoppingVertx(Vertx vertx) {
super.beforeStoppingVertx(vertx);
}
@Override
public void afterStoppingVertx() {
super.afterStoppingVertx();
}
@Override
public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
super.handleDeployFailed(vertx, mainVerticle, deploymentOptions, cause);
}
}
如果想使用Ignite的緩存,必須需要Ignite實例對象。否則無法獲取。
if (ignite == null) {
ClusterManager clusterManager = ((VertxInternal) vertx).getClusterManager();
String uuid = clusterManager.getNodeID();
ignite = Ignition.ignite(UUID.fromString(uuid));
}
在classpath中,配置一個ignite.xml,vertx啟動的時候自動會加載ignite.xml,然后使用IgniteManager進行集群管理。
我只貼一遍ignite.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Ignite Spring configuration file to startup Ignite cache.
This file demonstrates how to configure cache using Spring. Provided cache
will be created on node startup.
Use this configuration file when running HTTP REST examples (see 'examples/rest' folder).
When starting a standalone node, you need to execute the following command:
{IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml
When starting Ignite from Java IDE, pass path to this file to Ignition:
Ignition.start("examples/config/example-cache.xml");
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<!-- Set the page size to 4 KB -->
<property name="pageSize" value="4096"/>
<!-- Set concurrency level -->
<property name="concurrencyLevel" value="6"/>
<property name="systemRegionInitialSize" value="#{40 * 1024 * 1024}"/>
<property name="systemRegionMaxSize" value="#{80 * 1024 * 1024}"/>
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="name" value="Default_Region"/>
<!-- 設置默認內存區最大內存為 512M. -->
<property name="maxSize" value="#{512L * 1024 * 1024}"/>
<!-- Enabling RANDOM_LRU eviction for this region. -->
<property name="pageEvictionMode" value="RANDOM_2_LRU"/>
</bean>
</property>
<property name="dataRegionConfigurations">
<list>
<!--
Defining a data region that will consume up to 500 MB of RAM and
will have eviction and persistence enabled.
-->
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<!-- Custom region name. -->
<property name="name" value="500MB_Region"/>
<!-- 100 MB initial size. -->
<property name="initialSize" value="#{100L * 1024 * 1024}"/>
<!-- 500 MB maximum size. -->
<property name="maxSize" value="#{500L * 1024 * 1024}"/>
<!-- Enabling RANDOM_LRU eviction for this region. -->
<property name="pageEvictionMode" value="RANDOM_2_LRU"/>
</bean>
</list>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="UserInfo"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="backups" value="0"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
<property name="writeBehindEnabled" value="true"/>
<property name="writeBehindFlushSize" value="1024"/>
<property name="writeBehindFlushFrequency" value="5"/>
<property name="writeBehindFlushThreadCount" value="1"/>
<property name="writeBehindBatchSize" value="512"/>
<property name="dataRegionName" value="Default_Region"/>
</bean>
</list>
</property>
<property name="failureDetectionTimeout" value="60000"/>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Ignite 對內存有細致的劃分,可以分多個區域Region,每個區域有自己的配置,比如設置初始大小和最大大小,以及淘汰策略。
UserInfo對應的
CacheConfiguration對Cache使用進行了配置,比如readThrough writeThrough writeBehindEnabled等等,細致的配置諸如后寫刷新頻率writeBehindFlushFrequency為5,表示5秒才會刷新一次更新數據。
public static <T> IgniteCache<String, T> createIgniteCache(String cacheName, Class<? extends CacheStoreAdapter<String, T>> clazz) {
CacheConfiguration<String, T> cacheCfg = new CacheConfiguration<>(cacheName);
return Globals.ignite().getOrCreateCache(cacheCfg);
}
在Globals工具類,提供工具方法獲得IgniteCache對象。
package cn.empires.gs.player.service.impl;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.lang.IgniteFuture;
import cn.empires.common.Globals;
import cn.empires.common.cache.UserCacheStore;
import cn.empires.common.service.ServiceBase;
import cn.empires.gs.model.UserInfo;
import cn.empires.gs.player.service.UserService;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
public class UserServiceImpl extends ServiceBase implements UserService {
private final IgniteCache<String, UserInfo> cache;
public UserServiceImpl(Vertx vertx, JsonObject config) {
super(vertx, config);
cache = Globals.createIgniteCache(UserInfo.tableName, UserCacheStore.class);
}
@Override
public UserService getUserInfo(String id, Handler<AsyncResult<UserInfo>> handler) {
IgniteFuture<UserInfo> future = cache.getAsync(id);
future.listen(h -> {
if(h.isDone()) {
handler.handle(Future.succeededFuture(h.get()));
}
});
return this;
}
@Override
public UserService saveUserInfo(UserInfo userInfo, Handler<AsyncResult<UserInfo>> handler) {
IgniteFuture<Void> future = cache.putAsync(userInfo.get_id(), userInfo);
future.listen(h -> {
if(h.isDone()) {
handler.handle(Future.succeededFuture(userInfo));
}
});
return this;
}
}
最后一件事,就是同步寫庫,可以讀通過從MongoDB進行讀取。
package cn.empires.common.cache;
import java.util.ArrayList;
import java.util.List;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.bson.Document;
import com.mongodb.Block;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;
import cn.empires.common.Globals;
import cn.empires.common.contants.Loggers;
import cn.empires.gs.model.UserInfo;
import io.vertx.core.json.JsonObject;
public class UserCacheStore extends CacheStoreAdapter<String, UserInfo> implements LifecycleAware {
/** Mongo collection. */
private MongoCollection<Document> collection;
@Override
public void start() throws IgniteException {
}
@Override
public UserInfo load(String key) throws CacheLoaderException {
if(collection == null) {
collection = Globals.mongoDb().getCollection(UserInfo.tableName);
}
FindIterable<Document> iter = collection.find(Filters.eq("_id", key));
final List<JsonObject> result = new ArrayList<>(1);
iter.forEach(new Block<Document>() {
public void apply(Document _doc) {
result.add(new JsonObject(_doc.toJson()));
}
});
if(result != null && !result.isEmpty()) {
Loggers.userLogger.info("CacheStore load UserInfo.");
JsonObject jsonObj = result.get(0);
return UserInfo.fromDB(jsonObj);
}
return null;
}
@Override
public void write(Entry<? extends String, ? extends UserInfo> entry) throws CacheWriterException {
if(collection == null) {
collection = Globals.mongoDb().getCollection(UserInfo.tableName);
}
Document filter = new Document();
filter.append("_id", entry.getKey());
Document replacement = new Document();
replacement.append("value", entry.getValue().toString());
collection.replaceOne(filter, replacement, new UpdateOptions().upsert(true));
Loggers.userLogger.info("CacheStore saved UserInfo.");
}
@Override
public void delete(Object key) throws CacheWriterException {
}
@Override
public void stop() throws IgniteException {
}
}
由于在ignite.xml中進行了配置
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
</bean>
,所以在使用Cache獲取UserInfo的時候,如果不存在對應的信息,就會從MongoDB讀取。
更多的信息只能下一篇文章再寫了。有問題可以留言。
posted on 2018-11-13 14:29
北國狼人的BloG 閱讀(1597)
評論(0) 編輯 收藏