同樣,在介紹MapGuide 服務器如何處理枚舉資源操作之前,讓我們首先來看看MapGuide 服務器用于處理服務請求和操作的類,圖19?9顯示服務請求處理器類的類圖,圖19?10顯示了操作處理器類的類圖。
圖?19?10 服務請求處理器類的類圖
圖?19?11 操作處理器類的類圖
MapGuide提供了資源服務、要素服務等多種服務,每種服務包含了大量操作。MapGuide為每種服務提供了一個服務請求處理器類,用于處理這些服務所提供的操作,例如MgResourceServiceHandler、MgFeatureServiceHanlder等。這些類都繼承自IMgServiceHandler,并且實現了方法IMgServiceHandler::ProcessOperation(...)。MapGuide使用工廠類MgServiceHanlderFactory來創建一個服務請求處理器類的實例,給定一個服務ID,調用方法MgServiceHandlerFactory::GetHandler(...)可以創建對應的服務請求處理器類的實例。
對于服務中的每種操作,MapGuide也提供了相應的操作處理器類。從圖19?10可以看到,操作處理器類分為四個層次,最高一層是類IMgOperationHandler,它是所有操作處理類的基類,第二層是類MgServiceOperation,第三層是類MgXXXOperation,“XXX”代表服務名稱,在這一層每種類型的服務都有一個對應的類,例如MgFeatureOperation、MgResourceOperation等,第四層為真正負責工作的操作處理器類,每種服務中的每個操作都有一個對應的操作處理器類,某種服務中操作處理器類都繼承自同一個父類,例如MgEnumerateResource、MgSetResource都繼承自MgResourceOperation。
接下來讓我們看看MapGuide服務器如何處理枚舉資源操作,這個操作流程的時序圖如圖 19?11所示。
圖?19?12 MapGuide服務器處理枚舉資源操作的時序圖
1) 在線程池中找一個空閑的線程執行操作
MapGuide服務器使用了多線程和線程池的技術來提高操作處理和響應請求的性能。在MapGuide服務器啟動時,會創建一個操作請求隊列和一個用于處理操作請求的線程池。當一個操作請求進入操作隊列,MapGuide服務器會在線程池中找一個空閑的線程執行操作。如果沒有空閑的線程,那么這個操作請求會處于等待狀態,直到線程池中有了空閑的線程。
首先,讓我們查看一下服務器的入口函數的源代碼,看看服務器是如何啟動的。MapGuide服務器可以像普通的應用程序一樣運行,還可以在Windows操作系統可以運行為Windows服務,在Linux操作系統運行為守護進程,下面的代碼中只保留了運行為普通應用程序部分的代碼。
typedef ACE_Singleton SERVER;
int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
{
int nResult = 0;
......
// 初始化ACE服務
ACE::init();
......
// 根據命令行參數執行響應的操作
if((ACE_OS::strcasecmp(parameter, ACE_TEXT("?")) == 0) ||
(ACE_OS::strcasecmp(parameter,
MG_WCHAR_TO_TCHAR(MgResources::ServerCmdHelp)) == 0))
{
// 顯示服務器命令行參數
ShowCommandlineHelp();
......
}
else if((ACE_OS::strcasecmp(parameter,
MG_WCHAR_TO_TCHAR(MgResources::ServerCmdRun)) == 0) ||
(ACE_OS::strcasecmp(parameter,
MG_WCHAR_TO_TCHAR(MgResources::ServerCmdInteractive)) == 0))
{
ACE_OS::printf(MG_WCHAR_TO_CHAR(MgResources::ServerCmdRunInfo));
......
// 以普通應用程序的方式運行服務器
nResult = SERVER::instance()->init(argc, argv);
if(0 == nResult)
{
// 啟動服務
nResult = SERVER::instance()->open();
// 終止服務
SERVER::instance()->fini();
}
......
}
......
// 終止ACE服務
ACE::fini();
return nResult;
}
|
在上面的代碼中,我們可以看到大量以“ACE”為前綴的類,這些類是ACE自適配通信環境(Adaptive Communication Environment)工具包中的類。ACE是一個開源的工具包,它實現了許多用于并發通信軟件的核心模式。ACE的目標用戶是在UNIX和Win32平臺上開發高性能通信服務和應用的開發者。ACE簡化了使用進程間通信、事件多路分離、顯式動態鏈接和并發的OO網絡應用和服務的開發。ACE提供了一組豐富的可復用C++ Wrapper Facade(包裝外觀)和框架組件,可跨越多種平臺完成通用的通信軟件任務,其中包括:事件多路分離和事件處理器分派、信號處理、服務初始化、進程間通信、共享內存管理、消息路由、分布式服務動態(重)配置、并發執行和同步等。
類MgServer繼承自AEC工具包中的類ACE_NT_Service或ACE_Service_Object,對應于服務器的主線程,圖19?12顯示了服務器主線程類的類圖。MgServer::svc()是主線程的入口,調用方法MgServer::open()會啟動主線程并且執行方法MgServer::svc(),調用MgServer::fini()會終止主線程。MapGuide使用了Singleton模板ACE_Singleton將MgServer封裝為一個單實例類,為了方便使用MapGuide使用typedef為ACE_Singleton定義了一個別名SERVER,調用方法SERVER::instance()可以創建類MgServer的一個實例。
圖?19?13 服務器主線程類的類圖
MgServer::svc()是主線程的入口,該方法會創建一個操作請求隊列和一個用于處理操作請求的線程池,它的代碼如下所示。事實上,該方法還定義了其它類型的隊列和線程池,為了便于理解我們省略掉了這些代碼。
int MgServer::svc()
{
MgServerManager* pServerManager = MgServerManager::GetInstance();
// 創建線程管理器和操作線程
ACE_Thread_Manager threadManager;
MgOperationThread clientThreads(threadManager,
pServerManager->GetClientThreads());
pServerManager->SetClientMessageQueue(clientThreads.msg_queue_);
MgClientAcceptor clientAcceptor(clientAddr, ACE_Reactor::instance(),
clientThreads.msg_queue_);
nResult = clientAcceptor.Initialize();
if(nResult == 0)
{
// 啟動線程池
nResult = clientThreads.Activate();
if(nResult == 0)
{
// 通知操作線程停止執行
ACE_Message_Block* mb = new ACE_Message_Block(4);
if(mb)
{
mb->msg_type(ACE_Message_Block::MB_STOP);
clientThreads.putq(mb);
}
// 停止操作線程
clientThreads.close();
// 等待所有操作線程執行完成
threadManager.wait();
threadManager.close();
}
}
return nResult;
}
|
上面的代碼中創建一個類MgOperationThread的實例,該對象創建了一個線程池,維護了一個操作請求隊列。類MgOperationThread 繼承自ACE工具包中的模板類AEC_Task,對應于操作線程,圖19?13顯示了操作線程類的類圖。ACE_Task封裝了任務,每個任務都含有一或多個線程,以及一個底層消息隊列。方法ACE_Task::svc()是線程的啟動入口,調用方法ACE_Task::open()用于初始化任務,調用方法ACE_Task::close()用于終止任務,調用方法ACE_Task::activate()用于啟動線程,調用方法ACE_Task::putq()放置消息到任務的消息隊列中,調用方法ACE_Task::getq()從任務的消息隊列中取出消息。
圖?19?14 操作線程類的類圖
方法MgOperationThread::svc()是操作線程的入口,它的代碼如下所示。
int MgOperationThread::svc()
{
INT32 nResult = 0;
while (m_bActive)
{
ACE_Message_Block* messageBlock = NULL;
// 從消息隊列中取出消息
while (getq(messageBlock) == -1)
{
......
}
if(messageBlock->msg_type() == ACE_Message_Block::MB_STOP)
{
m_bActive = false;
ACE_Message_Block* mb = new ACE_Message_Block(4);
if(mb)
{
mb->msg_type(ACE_Message_Block::MB_STOP);
putq(mb);
}
}
else if(messageBlock->msg_type() ==?ACE_Message_Block::MB_DATA)
{
MgServerStreamData* pData =
dynamic_cast(messageBlock->data_block());
IMgServiceHandler::MgProcessStatus stat =?ProcessMessage(messageBlock);
......
}
}
return nResult;
}
IMgServiceHandler::MgProcessStatus MgOperationThread::ProcessMessage(ACE_Message_Block* pMB)
{
IMgServiceHandler::MgProcessStatus stat = IMgServiceHandler::mpsError;
MgServerStreamData* pData = NULL;
pData = (MgServerStreamData*) pMB->data_block();
......
MgStreamParser::ParseStreamHeader(pData);
MgStreamParser::ParseDataHeader(pData);
MgPacketParser::MgPacketHeader pt = MgPacketParser::GetPacketHeader(pData);
switch ( pt )
{
case (MgPacketParser::mphOperation):
stat = ProcessOperation( pData );
break;
......
}
return stat;
}
|
從上面的代碼可以看到,方法MgOperationThread::svc()會調用方法getq(…)循環訪問消息隊列,取出等待處理的消息。如果是數據類型的消息,那么調用方法MgOperationThread:: ProcessMessage(...)處理這個消息。方法MgOperationThread::ProcessMessage(...)會解析這個消息的頭和數據,如果消息數據中包含了一個操作請求,那么調用方法MgOperationThread:: ProcessOperation(…)處理這個操作請求。
2) 處理服務請求
方法MgOperationThread::ProcessOperation(…) 首先會解析消息數據中的服務ID、操作ID、操作的版本號等信息,然后根據服務ID使用工廠類MgServiceHanlderFactory來創建一個服務請求處理器類的實例,最后調用這個服務請求處理器類的方法IMgServiceHandler:: ProcessOperation(...)處理這個服務請求。
本節的示例中客戶端發送的是一個枚舉資源的操作請求,所以工廠類會創建一個MgResourceServiceHandler的實例,調用方法MgResourceServiceHandler::ProcessOperation(...)處理這個服務請求。
IMgServiceHandler::MgProcessStatus
MgOperationThread::ProcessOperation(MgServerStreamData* pData)
{
IMgServiceHandler::MgProcessStatus stat = IMgServiceHandler::mpsError;
MgOperationPacket op;
MgStreamHelper* pHelper = NULL;
pHelper = pData->GetStreamHelper();
pHelper->GetUINT32(op.m_PacketHeader);
pHelper->GetUINT32(op.m_PacketVersion);
pHelper->GetUINT32(op.m_ServiceID);
pHelper->GetUINT32(op.m_OperationID);
pHelper->GetUINT32(op.m_OperationVersion);
IMgServiceHandler* pServiceHandler =
MgServiceHandlerFactory::Instance()->GetHandler(op.m_ServiceID, pData, op);
delete pServiceHandler;
pServiceHandler = NULL;
stat = pServiceHandler->ProcessOperation();
return stat;
}
|
3) 處理操作請求
首先,方法MgResourceServiceHandler::ProcessOperation(...)會根據操作請求的ID,調用資源操作工廠類的方法MgResourceOperationFactory::GetOperation(…)創建相應的資源服務操作類MgOpEnumerateResources的實例。然后,調用MgOpEnumerateResources:: Initialize(...)初始化枚舉資源操作處理器,調用方法MgServiceManager::RequestService(…)創建服務器資源服務MgServerResourceService的實例。最后,調用方法MgOpEnumerateResources::Execute()處理操作請求,該方法會調用MgServerResourceService::EnumerateResources(…)實際執行枚舉資源的功能。
IMgServiceHandler::MgProcessStatus MgResourceServiceHandler::ProcessOperation()
{
IMgServiceHandler::MgProcessStatus status = IMgServiceHandler::mpsError;
auto_ptr handler;
MG_TRY()
handler.reset(MgResourceOperationFactory::GetOperation(
m_packet.m_OperationID, m_packet.m_OperationVersion));
assert(NULL != handler.get());
handler->Initialize(m_data, m_packet);
handler->Execute();
status = IMgServiceHandler::mpsDone;
MG_CATCH(L"MgResourceServiceHandler.ProcessOperation")
if (mgException != NULL && NULL != handler.get())
{
status = (handler.get()->HandleException(mgException) ?
IMgServiceHandler::mpsDone : IMgServiceHandler::mpsError);
}
if (IMgServiceHandler::mpsDone != status)
{
MG_THROW();
}
return status;
}
void MgOpEnumerateResources::Execute()
{
......
BeginExecution();
......
Validate();
Ptr byteReader = m_service->EnumerateResources(resource, depth,
type, properties, fromDate, toDate, computeChildren);
EndExecution(byteReader);
......
}
|