Microsoft 近期推出了一種用于生成集成應(yīng)用程序的新平臺——Microsoft .NET 框架。.NET 框架允許開發(fā)人員使用任何編程語言迅速生成和部署 Web 服務(wù)和應(yīng)用程序。Microsoft Intermediate Language (MSIL) 和實時 (JIT) 編譯器使這種不依賴語言的框架得以實現(xiàn)。
與 .NET 框架同時面世的還有一種新的編程語言 C#(讀作“C sharp”)。C# 是一種簡單、新穎、面向?qū)ο蠛皖愋桶踩木幊陶Z言。利用 .NET 框架和 C#(除 Microsoft? Visual Basic? 和 Managed C++ 之外),用戶可以編寫功能強大的 Microsoft Windows? 和 Web 應(yīng)用程序及服務(wù)。本文提供了這樣的一個解決方案,它的重點是 .NET 框架和 C# 而不是編程語言。C# 語言的介紹可以在“ C# 簡介和概述(英文)”找到。
近期的文章“MSMQ:可伸縮、高可用性的負載平衡解 決方案(英文)”介紹了一種解決方案,用于高可用性消息隊列 (MSMQ) 的可伸縮負載平衡解決方案體系結(jié)構(gòu)。此解決方案中涉及了一種將 Windows 服務(wù)用作智能消息路由器的開發(fā)方案。這樣的解決方案以前只有 Microsoft Visual C++? 程序員才能實現(xiàn),而 .NET 框架的出現(xiàn)改變了這種情況。從下面的解決方案中,您可以看到這一點。
.NET 框架應(yīng)用程序
這里介紹 的解決方案是一種用來處理若干消息隊列的 Windows 服務(wù);其中每個隊列都是由多個線程進行處理(接收和處理消息)。處理程序使用循環(huán)法技術(shù)或應(yīng)用程序特定值(消息 AppSpecific 屬性)從目的隊列列表中路由消息,并使用消息屬性來調(diào)用組件方法。(示例進程也屬于這種情況。)在后一種情況下,組件的要求是它能夠?qū)崿F(xiàn)給定的接口 IWebMessage。要處理錯誤,應(yīng)用程序需要將不能處理的消息發(fā)送到錯誤隊列中。
消息應(yīng)用程序的結(jié)構(gòu)與以前的活動模板庫 (ATL) 應(yīng)用程序相似,它們之間的主要不同在于用于管理服務(wù)的代碼的封裝和 .NET 框架組件的使用。要創(chuàng)建 Windows 服務(wù),.NET 框架用戶僅僅需要創(chuàng)建一個從 ServiceBase(來自 System.ServiceControl 程序集)繼承的類。這毫不奇怪,因為 .NET 框架是面向?qū)ο蟮摹?/font>
應(yīng)用程序結(jié)構(gòu)
應(yīng)用程序中主要的類是 ServiceControl,它是從 ServiceBase 繼承的。因而,它必須實現(xiàn) OnStart 和 OnStop 方法,以及可選的 OnPause 和 OnContinue 方法。事實上,類是在靜態(tài)方法 Main 內(nèi)構(gòu)造的:
using System; using System.ServiceProcess; public class ServiceControl: ServiceBase { // 創(chuàng)建服務(wù)對象的主入口點 public static void Main() { ServiceBase.Run(new ServiceControl()); } // 定義服務(wù)參數(shù)的構(gòu)造對象 public ServiceControl() { CanPauseAndContinue = true; ServiceName = "MSDNMessageService"; AutoLog = false; } protected override void OnStart(string[] args) {...} protected override void OnStop() {...} protected override void OnPause() {...} protected override void OnContinue() {...} } |
ServiceControl 類創(chuàng)建一系列 CWorker 對象,即,為需要處理的每個消息隊列創(chuàng)建 CWorker 類的一個實例。根據(jù)定義中處理隊列所需的線程數(shù)目,CWorker 類依次創(chuàng)建了一系列的 CWorkerThread 對象。CWorkerThread 類創(chuàng)建的一個處理線程將執(zhí)行實際的服務(wù)工作。
使用 CWorker 和 CWorkerThread 類的主要目的是確認服務(wù)控件 Start、Stop、Pause 和 Continue 命令。因為這些進程必須是無阻塞的,命令操作最終將在后臺處理線程上執(zhí)行。
CWorkerThread 是一個抽象類,被 CWorkerThreadAppSpecific 、CWorkerThreadRoundRobin 和 CWorkerThreadAssembly 繼承。這些類以不同的方式處理消息。前兩個類通過給另一隊列發(fā)送消息來處理消息(其不同之處在于確定接收隊列路徑的方式),最后一個類則使用消息屬性來調(diào) 用組件方法。
.NET 框架內(nèi)部的錯誤處理是以基類 Exception 為基礎(chǔ)的。當系統(tǒng)引發(fā)或捕獲錯誤時,這些錯誤必須是從 Exception 中導出的類。CWorkerThreadException 類就是這樣一種實現(xiàn),它通過附加額外屬性(用于定義服務(wù)是否應(yīng)繼續(xù)運行)來擴展基類。
最后,應(yīng)用程序包含兩種結(jié)構(gòu)。這些值類型定義了輔助進程或線程的運行時參數(shù),以簡化 CWorker 和 CWorkerThread 對象的結(jié)構(gòu)。使用值類型結(jié)構(gòu)(而不是引用類型類)能夠確保這些運行時參數(shù)維護的是數(shù)值(而不是引用)。
IWebMessage 接口
CWorkerThread 的實現(xiàn)之一是一個調(diào)用組件方法的類。這個名為 CWorkerThreadAssembly 的類使用 IWebMessage 接口來定義服務(wù)和組件之間的約定。
與當前版本的 Microsoft Visual Studio? 不同,C# 接口可以在任何語言中顯式定義,而不需要創(chuàng)建和編譯 IDL 文件。C# IWebMessage 接口的定義如下:
public interface IWebMessage { WebMessageReturn Process(string sMessageLabel, string sMessageBody, int iAppSpecific); void Release(); } |
ATL 代碼中的 Process 方法是為處理消息而指定的。Process 方法的返回代碼定義為枚舉類型 WebMessageReturn:
public enum WebMessageReturn { ReturnGood, ReturnBad, ReturnAbort } |
枚舉的定義如下:Good 表示繼續(xù)處理,Bad 表示將消息寫入錯誤隊列,Abort 表示終止處理。Release 方法為服務(wù)提供了輕松清除類實例的途徑。因為僅在垃圾回收的過程中才調(diào)用類實例的析構(gòu)函數(shù),所以確保所有占用昂貴資源(例如數(shù)據(jù)庫連接)的類都有一個能夠 在析構(gòu)之前被調(diào)用的方法,用來釋放這些資源,這是一種非常好的構(gòu)思。
名稱空間
在這里先簡單介紹一下名稱空間。名稱空間允許在內(nèi)部和外部表示中將應(yīng)用程序組織成為邏輯元素。服務(wù)內(nèi)的所有代碼都包含在 MSDNMessageService.Service 名稱空間內(nèi)。盡管服務(wù)代碼包含在若干文件中,但是由于它們包含在同一名稱空間中,因此用戶不需要引用其他文件。
由于 IWebMessage 接口包含在 MSDNMessageService.Interface 名稱空間中,因此使用此接口的線程類具有一個接口名稱空間。
服務(wù)類
應(yīng)用程序的目的是監(jiān)視和處理消息隊列,每一隊列在收到消息時都執(zhí)行不同的進程。應(yīng)用程序是作為 Windows 服務(wù)來實現(xiàn)的。
ServiceBase 類
如前所述,服務(wù)的基本結(jié)構(gòu)是從 ServiceBase 繼承的類。重要的方法包括 OnStart、OnStop、OnPause 和 OnContinue,每一個替代方法都與一個服務(wù)控制操作直接對應(yīng)。OnStart 方法的目的是創(chuàng)建 CWorker 對象,而 CWorker 類又創(chuàng)建 CWorkerThread 對象,然后在該對象中創(chuàng)建執(zhí)行服務(wù)工作的線程。
服務(wù)的運行時配置(以及 CWorker 和 CWorkerThread 對象的屬性)是在基于 XML 的配置文件中維護的。它的名稱與創(chuàng)建的 .exe 文件相同,但帶有一個 .cfg 后綴。配置示例如下:
<?xml version="1.0"?> <configuration> <ProcessList> ?。糚rocessDefinition ProcessName="Worker1" ProcessDesc="Message Worker with 2 Threads" ProcessType="AppSpecific" ProcessThreads="2" InputQueue=".\private$\test_load1" ErrorQueue=".\private$\test_error"> <OutputList> ?。糘utputDefinition OutputName=".\private$\test_out11" /> ?。糘utputDefinition OutputName=".\private$\test_out12" /> ?。?OutputList> ?。?ProcessDefinition> <ProcessDefinition ProcessName="Worker2" ProcessDesc="Assembly Worker with 1 Thread" ProcessType="Assembly" ProcessThreads="1" InputQueue=".\private$\test_load2" ErrorQueue=".\private$\test_error"> ?。糘utputList> ?。糘utputDefinition OutputName="C:\MSDNMessageService\MessageExample.dll" /> <OutputDefinition OutputName="MSDNMessageService.MessageSample.ExampleClass"/> ?。?OutputList> ?。?ProcessDefinition> </ProcessList> </configuration> |
對此信息的訪問通過來自 System.Configuration 程序集的 ConfigManager 類來管理。靜態(tài) Get 方法返回信息的集合,這些集合將被枚舉以獲得單個屬性。這些屬性集的設(shè)置決定了輔助對象的運行時特征。除了這一配置文件,您還應(yīng)該創(chuàng)建定義 XML 文件結(jié)構(gòu)的圖元文件,并在其中引用位于服務(wù)器 machine.cfg 配置文件中的圖元文件:
<?xml version ="1.0"?> <MetaData xmlns="x-schema:CatMeta.xms"> ?。糄atabaseMeta InternalName="MessageService"> ?。糞erverWiring Interceptor="Core_XMLInterceptor"/> ?。糃ollection InternalName="Process" PublicName="ProcessList" PublicRowName="ProcessDefinition" SchemaGeneratorFlags="EMITXMLSCHEMA"> <Property InternalName="ProcessName" Type="String" MetaFlags="PRIMARYKEY" /> ?。糚roperty InternalName="ProcessDesc" Type="String" /> ?。糚roperty InternalName="ProcessType" Type="Int32" DefaultValue="RoundRobin" > ?。糆num InternalName="RoundRobin" Value="0"/> <Enum InternalName="AppSpecific" Value="1"/> ?。糆num InternalName="Assembly" Value="2"/> ?。?Property> ?。糚roperty InternalName="ProcessThreads" Type="Int32" DefaultValue="1" /> <Property InternalName="InputQueue" Type="String" /> <Property InternalName="ErrorQueue" Type="String" /> ?。糚roperty InternalName="OutputName" Type="String" /> ?。糛ueryMeta InternalName="All" MetaFlags="ALL" /> ?。糛ueryMeta InternalName="QueryByFile" CellName="__FILE" Operator="EQUAL" /> </Collection> ?。糃ollection InternalName="Output" PublicName="OutputList" PublicRowName="OutputDefinition" SchemaGeneratorFlags="EMITXMLSCHEMA"> <Property InternalName="ProcessName" Type="String" MetaFlags="PRIMARYKEY" /> ?。糚roperty InternalName="OutputName" Type="String" MetaFlags="PRIMARYKEY" /> ?。糛ueryMeta InternalName="All" MetaFlags="ALL" /> ?。糛ueryMeta InternalName="QueryByFile" CellName="__FILE" Operator="EQUAL" /> ?。?Collection> ?。?DatabaseMeta> <RelationMeta PrimaryTable="Process" PrimaryColumns="ProcessName" ForeignTable="Output" ForeignColumns="ProcessName" MetaFlags="USECONTAINMENT"/> </MetaData> |
private Hashtable htWorkers = new Hashtable(); IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new AppDomainSelector()); foreach (IConfigItem ciWorker in cWorkers) { WorkerFormatter sfWorker = new WorkerFormatter(); sfWorker.ProcessName = (string)ciWorker["ProcessName"]; sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"]; sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"]; sfWorker.InputQueue = (string)ciWorker["InputQueue"]; sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"]; // 計算并定義進程類型 switch ((int)ciWorker["ProcessType"]) { case 0: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessRoundRobin; break; case 1: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessAppSpecific; break; case 2: sfWorker.ProcessType = WorkerFormatter.SFProcessType.ProcessAssembly; break; default: throw new Exception("Unknown Processing Type"); } // 執(zhí)行更多的工作以讀取輸出信息 string sProcessName = (string)ciWorker["ProcessName"]; if (htWorkers.ContainsKey(sProcessName)) throw new ArgumentException("Process Name Must be Unique: " + sProcessName); htWorkers.Add(sProcessName, new CWorker(sfWorker)); } |
在這段代碼中沒有包含的主要信息是輸出數(shù)據(jù)的獲取。每一個進程定義中都有一組相應(yīng)的輸出定義項。該信息是通過如下的簡單查詢讀取的:
string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" + sfWorker.ProcessName + " AND Selector=appdomain://"; ConfigQuery qQuery = new ConfigQuery(sQuery); IConfigCollection cOutputs = ConfigManager.Get("OutputList", qQuery); int iSize = cOutputs.Count, iLoop = 0; sfWorker.OutputName = new string[iSize]; foreach (IConfigItem ciOutput in cOutputs) sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"]; |
CWorkerThread 和 Cworker 類都有相應(yīng)的服務(wù)控制方法,根據(jù)服務(wù)控制操作進行調(diào)用。由于 Hashtable 中引用了每一個 CWorker 對象,因此需要枚舉 Hashtable 的內(nèi)容,以調(diào)用適當?shù)姆?wù)控制方法:
foreach (CWorker cWorker in htWorkers.Values) cWorker.Start(); |
類似地,實現(xiàn)的 OnPause、OnContinue 和 OnStop 方法是通過調(diào)用 CWorker 對象上的相應(yīng)方法來執(zhí)行操作的。
CWorker 類
CWorker 類的主要功能是創(chuàng)建和管理 CWorkerThread 對象。Start、Stop、Pause 和 Continue 方法調(diào)用相應(yīng)的 CWorkerThread 方法。實際的 CWorkerThread 對象是在Start 方法中創(chuàng)建的。與使用 Hashtable 管理輔助對象引用的 Service 類相似,CWorker 使用 ArrayList(簡單的動態(tài)數(shù)組)來維護線程對象的列表。
在這個數(shù)組內(nèi)部,CWorker 類創(chuàng)建了 CWorkerThread 類的一個實現(xiàn)版本。CWorkerThread 類(將在下面討論)是一個必須繼承的抽象類。導出類定義了消息的處理方式:
aThreads = new ArrayList(); for (int idx=0; idx<sfWorker.NumberThreads; idx++) { WorkerThreadFormatter wfThread = new WorkerThreadFormatter(); wfThread.ProcessName = sfWorker.ProcessName; wfThread.ProcessDesc = sfWorker.ProcessDesc; wfThread.ThreadNumber = idx; wfThread.InputQueue = sfWorker.InputQueue; wfThread.ErrorQueue = sfWorker.ErrorQueue; wfThread.OutputName = sfWorker.OutputName; // 定義輔助類型,并將其插入輔助線程結(jié)構(gòu) CWorkerThread wtBase; switch (sfWorker.ProcessType) { case WorkerFormatter.SFProcessType.ProcessRoundRobin: wtBase = new CWorkerThreadRoundRobin(this, wfThread); break; case WorkerFormatter.SFProcessType.ProcessAppSpecific: wtBase = new CWorkerThreadAppSpecific(this, wfThread); break; case WorkerFormatter.SFProcessType.ProcessAssembly: wtBase = new CWorkerThreadAssembly(this, wfThread); break; default: throw new Exception("Unknown Processing Type"); } // 添加對數(shù)組的調(diào)用 aThreads.Insert(idx, wtBase); } |
一旦所有的對象都已創(chuàng)建,就可以通過調(diào)用每個線程對象的 Start 方法來啟動它們:
foreach(CWorkerThread cThread in aThreads) cThread.Start(); |
Stop、Pause 和 Continue 方法在 foreach 循環(huán)里執(zhí)行的操作類似。Stop 方法具有如下的垃圾收集操作:
GC.SuppressFinalize(this); |
在類析構(gòu)函數(shù)中將調(diào)用 Stop 方法,這樣,在沒有顯式調(diào)用 Stop 方法的情況下也可以正確地終止對象。如果調(diào)用了 Stop
方法,將不需要析構(gòu)函數(shù)。SuppressFinalize 方法能夠防止調(diào)用對象的 Finalize 方法(析構(gòu)函數(shù)的實際實現(xiàn))。
CWorkerThread 抽象類
CWorkerThread 是一個由 CWorkerThreadAppSpecifc、CWorkerThreadRoundRobin 和
CWorkerThreadAssembly 繼承的抽象類。無論如何處理消息,隊列的大部分處理是相同的,所以 CWorkerThread
類提供了這一功能。這個類提供了抽象方法(必須被實際方法替代)以管理資源和處理消息。
類的工作再一次通過 Start、Stop、Pause 和 Continue 方法來實現(xiàn)。在 Start 方法中引用了輸入和錯誤隊列。在 .NET 框架中,消息由 System.Messaging 名稱空間處理:
// 嘗試打開隊列,并設(shè)置默認的讀寫屬性 MessageQueue mqInput = new MessageQueue(sInputQueue); mqInput.MessageReadPropertyFilter.Body = true; mqInput.MessageReadPropertyFilter.AppSpecific = true; MessageQueue mqError = new MessageQueue(sErrorQueue); // 如果使用 MSMQ COM,則將格式化程序設(shè)置為 ActiveX mqInput.Formatter = new ActiveXMessageFormatter(); mqError.Formatter = new ActiveXMessageFormatter(); |
一旦定義了消息隊列引用,即會創(chuàng)建一個線程用于實際的處理函數(shù)(稱為 ProcessMessages)。在 .NET 框架中,使用 System.Threading 名稱空間很容易實現(xiàn)線程處理:
procMessage = new Thread(new ThreadStart(ProcessMessages)); procMessage.Start(); |
ProcessMessages 函數(shù)是基于 Boolean 值的處理循環(huán)。當數(shù)值設(shè)為 False,處理循環(huán)將終止。因此,線程對象的 Stop 方法只設(shè)置這一 Boolean 值,然后關(guān)閉打開的消息隊列,并加入帶有主線程的線程:
// 加入服務(wù)線程和處理線程 bRun = false; procMessage.Join(); // 關(guān)閉打開的消息隊列 mqInput.Close(); mqError.Close(); |
Pause 方法只設(shè)置一個 Boolean 值,使處理線程休眠半秒鐘:
if (bPause) Thread.Sleep(500); |
最后,每一個 Start、Stop、Pause 和 Continue 方法將調(diào)用抽象的 OnStart、OnStop、OnPause 和 OnContinue 方法。這些抽象方法為實現(xiàn)的類提供了掛鉤,以捕獲和釋放所需的資源。
ProcessMessages 循環(huán)具有如下基本結(jié)構(gòu):
1、接收 Message。
2、如果 Message 具有成功的 Receive,則調(diào)用抽象 ProcessMessage 方法。
3、如果 Receive 或 ProcessMessage 失敗,將 Message 發(fā)送至錯誤隊列中。
Message mInput; try { // 從隊列中讀取,并等候 1 秒 mInput = mqInput.Receive(new TimeSpan(0,0,0,1)); } catch (MessageQueueException mqe) { // 將消息設(shè)置為 null mInput = null; // 查看錯誤代碼,了解是否超時 if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B { // 如果未超時,發(fā)出一個錯誤并記錄錯誤號 LogError("Error: " + mqe.Message); throw mqe; } } if (mInput != null) { // 得到一個要處理的消息,調(diào)用處理消息抽象方法 try { ProcessMessage(mInput); } // 捕獲已知異常狀態(tài)的錯誤 catch (CWorkerThreadException ex) { ProcessError(mInput, ex.Terminate); } // 捕獲未知異常,并調(diào)用 Terminate catch { ProcessError(mInput, true); } } |
ProcessError 方法將錯誤的消息發(fā)送至錯誤隊列。另外,它也可能引發(fā)異常來終止線程。如果ProcessMessage 方法引發(fā)了終止錯誤或 CWorkerThreadException 類型,它將執(zhí)行此操作。
CworkerThread 導出類
任何從 CWorkerThread 中繼承的類都必須提供 OnStart、OnStop、OnPause、OnContinue 和 ProcessMessage 方法。OnStart 和 OnStop 方法獲取并釋放處理資源。OnPause 和 OnContinue 方法允許臨時釋放和重新獲取這些資源。ProcessMessage 方法應(yīng)該處理消息,并在出現(xiàn)失敗事件時引發(fā) CWorkerThreadException 異常。
由于 CWorkerThread 構(gòu)造函數(shù)定義運行時參數(shù),導出類必須調(diào)用基類構(gòu)造函數(shù):
public CWorkerThreadDerived(CWorker v_cParent, WorkerThreadFormatter v_wfThread) : base (v_cParent, v_wfThread) {} |
導出類提供了兩種類型的處理:將消息發(fā)送至另一隊列,或者調(diào)用組件方法。接收和發(fā)送消息的兩種實現(xiàn)使用了循環(huán)技術(shù)或應(yīng)用程序偏移(保留在消息 AppSpecific 屬性中),作為使用哪一隊列的決定因素。此方案中的配置文件應(yīng)該包括隊列路徑的列表。實現(xiàn)的 OnStart 和 OnStop 方法應(yīng)該打開和關(guān)閉對這些隊列的引用:
iQueues = wfThread.OutputName.Length; mqOutput = new MessageQueue[iQueues]; for (int idx=0; idx<iQueues; idx++) { mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]); mqOutput[idx].Formatter = new ActiveXMessageFormatter(); } |
在這些方案中,消息的處理很簡單:將消息發(fā)送必要的輸出隊列。在循環(huán)情況下,這個進程為:
try { mqOutput[iNextQueue].Send(v_mInput); } catch (Exception ex) { // 如果錯誤強制終止異常 throw new CWorkerThreadException(ex.Message, true); } // 計算下一個隊列號 iNextQueue++; iNextQueue %= iQueues; |
后一種調(diào)用帶消息參數(shù)的組件的實現(xiàn)方法比較有趣。ProcessMessage 方法使用 IWebMessage 接口調(diào)入一個 .NET 組件。OnStart 和 OnStop 方法獲取和釋放此組件的引用。
此方案中的配置文件應(yīng)該包含兩個項目:完整的類名和類所在文件的位置。按照 IWebMessage 接口中的定義,在組件上調(diào)用 Process 方法。
要獲取對象引用,需要使用 Activator.CreateInstance 方法。此函數(shù)需要一個程序集類型。在這里,它是從程序集文件路徑和類名中導出的。一旦獲取對象引用,它將被放入合適的接口:
private IWebMessage iwmSample; private string sFilePath, sTypeName; // 保存程序集路徑和類型名稱 sFilePath = wfThread.OutputName[0]; sTypeName = wfThread.OutputName[1]; // 獲取對必要對象的引用 Assembly asmSample = Assembly.LoadFrom(sFilePath); Type typSample = asmSample.GetType(sTypeName); object objSample = Activator.CreateInstance(typSample); // 定義給對象的必要接口 iwmSample = (IWebMessage)objSample; 獲取對象引用后,ProcessMessage 方法將在 IWebMessage 接口上調(diào)用 Process 方法: WebMessageReturn wbrSample; try { // 定義方法調(diào)用的參數(shù) string sLabel = v_mInput.Label; string sBody = (string)v_mInput.Body; int iAppSpecific = v_mInput.AppSpecific; // 調(diào)用方法并捕捉返回代碼 wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific); } catch (InvalidCastException ex) { // 如果在消息內(nèi)容中發(fā)生錯誤,則強制發(fā)出一個非終止異常 throw new CWorkerThreadException(ex.Message, false); } catch (Exception ex) { // 如果錯誤調(diào)用程序集,則強制發(fā)出終止異常 throw new CWorkerThreadException(ex.Message, true); } // 如果沒有錯誤,則檢查對象調(diào)用的返回狀態(tài) switch (wbrSample) { case WebMessageReturn.ReturnBad: throw new CWorkerThreadException ("Unable to process message: Message marked bad", false); case WebMessageReturn.ReturnAbort: throw new CWorkerThreadException ("Unable to process message: Process terminating", true); default: break; } |
提供的示例組件將消息正文寫入數(shù)據(jù)庫表。如果捕獲到嚴重數(shù)據(jù)庫錯誤,您可能希望終止處理過程,但是在這里,僅僅將消息標記為錯誤的消息。
由于此示例中創(chuàng)建的類實例可能會獲取并保留昂貴的數(shù)據(jù)庫資源,所以用 OnPause 和 OnContinue 方法釋放和重新獲取對象引用。
CworkerThread 導出類
任何從 CWorkerThread 中繼承的類都必須提供 OnStart、OnStop、OnPause、OnContinue 和 ProcessMessage 方法。OnStart 和 OnStop 方法獲取并釋放處理資源。OnPause 和 OnContinue 方法允許臨時釋放和重新獲取這些資源。ProcessMessage 方法應(yīng)該處理消息,并在出現(xiàn)失敗事件時引發(fā) CWorkerThreadException 異常。
由于 CWorkerThread 構(gòu)造函數(shù)定義運行時參數(shù),導出類必須調(diào)用基類構(gòu)造函數(shù):
public CWorkerThreadDerived(CWorker v_cParent, WorkerThreadFormatter v_wfThread) : base (v_cParent, v_wfThread) {} |
導出類提供了兩種類型的處理:將消息發(fā)送至另一隊列,或者調(diào)用組件方法。接收和發(fā)送消息的兩種實現(xiàn)使用了循環(huán)技術(shù)或應(yīng)用程序偏移(保留在消息 AppSpecific 屬性中),作為使用哪一隊列的決定因素。此方案中的配置文件應(yīng)該包括隊列路徑的列表。實現(xiàn)的 OnStart 和 OnStop 方法應(yīng)該打開和關(guān)閉對這些隊列的引用:
iQueues = wfThread.OutputName.Length; mqOutput = new MessageQueue[iQueues]; for (int idx=0; idx<iQueues; idx++) { mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]); mqOutput[idx].Formatter = new ActiveXMessageFormatter(); } |
在這些方案中,消息的處理很簡單:將消息發(fā)送必要的輸出隊列。在循環(huán)情況下,這個進程為:
try { mqOutput[iNextQueue].Send(v_mInput); } catch (Exception ex) { // 如果錯誤強制終止異常 throw new CWorkerThreadException(ex.Message, true); } // 計算下一個隊列號 iNextQueue++; iNextQueue %= iQueues; |
后一種調(diào)用帶消息參數(shù)的組件的實現(xiàn)方法比較有趣。ProcessMessage 方法使用 IWebMessage 接口調(diào)入一個 .NET 組件。OnStart 和 OnStop 方法獲取和釋放此組件的引用。
此方案中的配置文件應(yīng)該包含兩個項目:完整的類名和類所在文件的位置。按照 IWebMessage 接口中的定義,在組件上調(diào)用 Process 方法。
要獲取對象引用,需要使用 Activator.CreateInstance 方法。此函數(shù)需要一個程序集類型。在這里,它是從程序集文件路徑和類名中導出的。一旦獲取對象引用,它將被放入合適的接口:
private IWebMessage iwmSample; private string sFilePath, sTypeName; // 保存程序集路徑和類型名稱 sFilePath = wfThread.OutputName[0]; sTypeName = wfThread.OutputName[1]; // 獲取對必要對象的引用 Assembly asmSample = Assembly.LoadFrom(sFilePath); Type typSample = asmSample.GetType(sTypeName); object objSample = Activator.CreateInstance(typSample); // 定義給對象的必要接口 iwmSample = (IWebMessage)objSample; 獲取對象引用后,ProcessMessage 方法將在 IWebMessage 接口上調(diào)用 Process 方法: WebMessageReturn wbrSample; try { // 定義方法調(diào)用的參數(shù) string sLabel = v_mInput.Label; string sBody = (string)v_mInput.Body; int iAppSpecific = v_mInput.AppSpecific; // 調(diào)用方法并捕捉返回代碼 wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific); } catch (InvalidCastException ex) { // 如果在消息內(nèi)容中發(fā)生錯誤,則強制發(fā)出一個非終止異常 throw new CWorkerThreadException(ex.Message, false); } catch (Exception ex) { // 如果錯誤調(diào)用程序集,則強制發(fā)出終止異常 throw new CWorkerThreadException(ex.Message, true); } // 如果沒有錯誤,則檢查對象調(diào)用的返回狀態(tài) switch (wbrSample) { case WebMessageReturn.ReturnBad: throw new CWorkerThreadException ("Unable to process message: Message marked bad", false); case WebMessageReturn.ReturnAbort: throw new CWorkerThreadException ("Unable to process message: Process terminating", true); default: break; } |
提供的示例組件將消息正文寫入數(shù)據(jù)庫表。如果捕獲到嚴重數(shù)據(jù)庫錯誤,您可能希望終止處理過程,但是在這里,僅僅將消息標記為錯誤的消息。
由于此示例中創(chuàng)建的類實例可能會獲取并保留昂貴的數(shù)據(jù)庫資源,所以用 OnPause 和 OnContinue 方法釋放和重新獲取對象引用。
文章來源:http://www.tkk7.com/kuxiaoku/articles/94804.html