1.本來trigger receiver流程的模塊和接收者類是放在一個(gè)APP Server上的,但由于性能的考慮,這種schedule模塊的調(diào)度和管理可能會(huì)影響業(yè)務(wù)邏輯的執(zhí)行,占用業(yè)務(wù)邏輯執(zhí)行的系統(tǒng)資源,所以將它放到單獨(dú)的JVM上運(yùn)行,作為一個(gè)Standalone的java application。這樣schedule模塊就不能直接通過內(nèi)存調(diào)用接收者流程,接收者必須開放遠(yuǎn)程rpc服務(wù),讓trigger通過遠(yuǎn)程調(diào)用的方法主動(dòng)調(diào)用消息接收者去接受消息。
原來的系統(tǒng)通過MessageReceiverFactory獲得一個(gè)MessageReceiver 然后主動(dòng)接收消息。
MessageReceiver:
public interface MessageReceiver {
void invokeProcessFlow(String processId) throws ServerReceiverException;
}
|
ServerMessageReceiver:
public class ServerMessageReceiver implements MessageReceiver {
ServerMessageReceiver() {
}
public void invokeProcessFlow(String processId) throws ServerReceiverException {
//receive message
}
|
MessageReceiverFactory:
public class MessageReceiverFactory {
private static MessageReceiver messageReceiver;
private static void init() throws ServerReceiverException {
messageReceiver = new ServerMessageReceiver();
}
public synchronized static MessageReceiver getMessageReceiver() throws ServerReceiverException {
if(messageReceiver == null) {
init();
}
return messageReceiver;
}
}
|
2.增加RMI服務(wù)后:
客戶端:
· 客戶端通過MessageReceiverFactory獲得MessageReceiver的遠(yuǎn)程版本:RemoteMessageReceiver
· 客戶端使用遠(yuǎn)程版本的MessageReceiver look up出RemotableMessageReceiver的RMI Stub進(jìn)行遠(yuǎn)程的RPC調(diào)用,通知服務(wù)器端接收消息。
public class RemoteMessageReceiver implements MessageReceiver {
private RemotableMessageReceiver messageReceiver;
private String rmiHost;
private int rmiPort;
private String serviceName;
public RemoteMessageReceiver(String host, int port, String serviceName) throws ServerReceiverException {
try{
this.rmiHost = host;
this.rmiPort = port;
this.serviceName = serviceName;
Registry registry = LocateRegistry.getRegistry(rmiHost, rmiPort);
messageReceiver = (RemotableMessageReceiver) registry.lookup(this.serviceName);
}catch(Exception ex) {
throw new ServerReceiverException("look up remote message receiver failed!", this, ex);
}
}
public void invokeProcessFlow(String processId) throws ServerReceiverException {
try {
this.messageReceiver.invokeProcessFlow(processId);
} catch (RemoteException e) {
throw new ServerReceiverException(e.getMessage(), this, e);
}
}
}
|
這里用了適配器模式,對(duì)RemotableMessageReceiver進(jìn)行適配,從而滿足MessageReceiver接口的標(biāo)準(zhǔn),方便MessageReceiverFactory的統(tǒng)一生產(chǎn),并且使客戶端使用的RemoteMessageReceiver和具體使用的通信方法解耦,將來如果不使用RMI, 只需要替換RemotableMessageReceiver就可以了。不會(huì)影響客戶端的代碼。
服務(wù)器端:
- 服務(wù)器端設(shè)置開關(guān),可以開啟和關(guān)閉遠(yuǎn)程服務(wù)。
- 如果開啟遠(yuǎn)程服務(wù),就需要注冊(cè)服務(wù),注冊(cè)的服務(wù)對(duì)象是提供給客戶端用的遠(yuǎn)程對(duì)象,服務(wù)端本身使用的MessageReceiver不需要實(shí)現(xiàn)RemotableMessageReceiver接口和原來一樣實(shí)現(xiàn)MessageReceiver接口,從而原有的應(yīng)用并沒有太大影響。
- 服務(wù)器端通過MessageReceiverFactory獲得MessageReceiver也可以主動(dòng)接受消息,如果需要提供遠(yuǎn)程服務(wù),就必須注冊(cè)遠(yuǎn)程服務(wù)。
這樣,服務(wù)器端首先增加了RemotableMessageReceiver接口,所有遠(yuǎn)程對(duì)象實(shí)現(xiàn)該接口,RMIMessageReceiver提供RMI服務(wù)的遠(yuǎn)程對(duì)象,供客戶端遠(yuǎn)程調(diào)用。RMIMessageReceiver必須滿足RemotableMessageReceiver接口的契約,所以也使用了適配器模式。增加了RMIMessageReceiver后,服務(wù)器端使用的MessageReceiver就和RMIMessageReceiver耦合在一起了,為了解耦,我又新增了一個(gè)StdMessageReceiver類,作為服務(wù)器端本地的MessageReceiver,供服務(wù)器端本地調(diào)用,StdMessageReceiver用了裝飾器模式,對(duì)已有的MessageReceiver進(jìn)行修飾,支持服務(wù)器端本地調(diào)用,如果以后本地調(diào)用加了新的功能就不會(huì)影響RMIMessageReceiver的功能了,比如凡是本地調(diào)用都需要在服務(wù)器端記log,這樣就只需要在StdMessageReceiver添加打log的功能,而不會(huì)影響RMIMessageReceiver。
MessageReceiverFactory簡(jiǎn)單工廠用于生產(chǎn)MessageReceiver對(duì)象,可能是客戶端的RemoteMessageReceiver也可能是服務(wù)器端的StdMessageReceiver。
為了在deploy時(shí)候就啟動(dòng)RMI服務(wù),我們可以在servlet的init方法中初始化并注冊(cè)RMI服務(wù),在deetroy方法中相應(yīng)的取消RMI服務(wù)。
RemotableMessageReceiver: 遠(yuǎn)程服務(wù)對(duì)象接口
public interface RemotableMessageReceiver extends Remote {
void invokeProcessFlow(String processId) throws ServerReceiverException,
RemoteException;
}
|
RMIMessageReceiver:提供RMI服務(wù)的遠(yuǎn)程對(duì)象
public class RMIMessageReceiver implements RemotableMessageReceiver {
private MessageReceiver messageReceiver;
RMIMessageReceiver(MessageReceiver messageReceiver) throws RemoteException {
this.messageReceiver = messageReceiver;
SystemInfo sysInfo = ContextFactory.getSystemConfigContext().getSystemInfo();
System.out.println("beign to bind at rmiport: " + sysInfo.getRmiport());
RMIUtils.bind(sysInfo.getRmiport(), sysInfo
.getReceiverRmiBindName(), this);
}
public void invokeProcessFlow(String processId)
throws ServerReceiverException, RemoteException {
this.messageReceiver.invokeProcessFlow(processId);
}
}
|
SystemInfo存儲(chǔ)的是一些配置信息,比如是不是服務(wù)器端,是不是需要開啟遠(yuǎn)程服務(wù),如果開啟的話相應(yīng)的host,port和rmi遠(yuǎn)程對(duì)象綁定的服務(wù)名
StdMessageReceiver:服務(wù)器端使用的MessageReceiver。
public class StdMessageReceiver implements MessageReceiver {
private MessageReceiver messageReceiver;
public StdMessageReceiver(MessageReceiver messageReceiver) {
this.messageReceiver = messageReceiver;
}
public void invokeProcessFlow(String processId) throws ServerReceiverException {
//do some log
this.messageReceiver.invokeProcessFlow(processId);
}
}
|
MessageReceiverFactory: 生產(chǎn)MessageReceiver的簡(jiǎn)單工廠:
public class MessageReceiverFactory {
private static MessageReceiver messageReceiver;
private static RemotableMessageReceiver remotableMessageReceiver;
private static void init() throws ServerReceiverException {
SystemInfo sysInfo = ContextFactory.getSystemConfigContext()
.getSystemInfo();
if (sysInfo.isServer()) {
MessageReceiver mifMessageReceiver = new ServerMessageReceiver();
if (sysInfo.isRemoteable()) {
try {
remotableMessageReceiver = new RMIMessageReceiver(mifMessageReceiver);
} catch (RemoteException e) {
throw new ServerReceiverException(
"bind Message Receiver to port: "
+ sysInfo.getRmiport()
+ " with service name: "
+ sysInfo.getReceiverRmiBindName(), e);
}
}
messageReceiver = new StdMessageReceiver(mifMessageReceiver);
} else {
String rmiHost = sysInfo.getRmiHost();
int rmiPort = sysInfo.getRmiport();
String serviceName = sysInfo.getReceiverRmiBindName();
messageReceiver = createRemoteMesageReceiver(rmiHost, rmiPort,
serviceName);
}
}
public static MessageReceiver createRemoteMesageReceiver(String rmiHost,
int rmiPort, String serviceName) throws ServerReceiverException {
return new RemoteMessageReceiver(rmiHost, rmiPort, serviceName);
}
public synchronized static MessageReceiver getMessageReceiver() throws ServerReceiverException {
if(messageReceiver == null) {
init();
}
return messageReceiver;
}
public static RemotableMessageReceiver getRemoteableMesageReceiver(){
return remotableMessageReceiver;
}
public synchronized static void initMessageReceiver() throws ServerReceiverException {
init();
}
}
|
StartUpServlet:啟動(dòng)時(shí)(deploy)時(shí)parse config并且初始化MessageReceiver
publicclass StartupServlet extends HttpServlet {
public void init(ServletConfig config) throws ServletException {
try {
//parse configuration
//init rmi service
MessageReceiverFactory.initMessageReceiver();
} catch (Exception ex) {
ex.printStackTrace();
thrownew ServletException(ex.getMessage(), ex);
}
}
publicvoid destroy() {
try {
RemotableMessageReceiver remotableMessageReceiver = MessageReceiverFactory
.getRemoteableMesageReceiver();
if (remotableMessageReceiver != null) {
SystemInfo sysInfo = ContextFactory.getSystemConfigContext()
.getSystemInfo();
RMIUtils.unBind(sysInfo.getRmiport(), sysInfo
.getReceiverRmiBindName(),
remotableMessageReceiver);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
RMIUtils: 工具類提供rmi注冊(cè)和撤銷服務(wù)的功能:
public class RMIUtils {
public static void bind(int rmiPort, String serviceName, Remote remoteObject)
throws RemoteException {
SystemInfo sysInfo = ContextFactory.getSystemConfigContext()
.getSystemInfo();
Remote exportable = (Remote) UnicastRemoteObject
.exportObject(remoteObject);
Registry registry = null;
try {
registry = LocateRegistry.getRegistry(sysInfo.getRmiport());
registry.list();
} catch (Exception e) {
registry = LocateRegistry.createRegistry(sysInfo.getRmiport());
}
registry.list();
System.out.println("bind the service: " + serviceName);
String bindName = serviceName;
registry.rebind(bindName, exportable);
}
public static void unBind(int rmiPort, String serviceName, Remote remoteObject) throws RemoteException {
SystemInfo sysInfo = ContextFactory.getSystemConfigContext()
.getSystemInfo();
Registry registry = LocateRegistry.getRegistry(sysInfo.getRmiport());
String bindName = serviceName;
try {
registry.unbind(bindName);
UnicastRemoteObject.unexportObject(remoteObject, true);
} catch (java.rmi.NotBoundException nbe) {
}
}
}
|