引自:
http://act.it.sohu.com/book/chapter.php?id=387&volume=4&chapter=3JDBC驅(qū)動(dòng)程序和XAResource 為了簡(jiǎn)化XAResource的說(shuō)明,這些例子說(shuō)明了一個(gè)應(yīng)用程序在不包含應(yīng)用程序服務(wù)器和事項(xiàng)管理程序的情況下應(yīng)該如何使用JTA。 基本上,這些例子中的應(yīng)用程序也擔(dān)任應(yīng)用程序服務(wù)器和事項(xiàng)管理程序的任務(wù)。大部分的企業(yè)使用事務(wù)管理程序和應(yīng)用程序服務(wù)器,因?yàn)樗鼈兡軌虮纫粋€(gè)應(yīng)用程序更能夠高效地管理分布式事務(wù)。 然而遵循這些例子,一個(gè)應(yīng)用程序開(kāi)發(fā)者可以測(cè)試在JDBC驅(qū)動(dòng)程序中的JTA支持的健壯性。而且有一些例子可能不是工作在某個(gè)特定的數(shù)據(jù)庫(kù)上,這是因?yàn)殛P(guān)聯(lián)在數(shù)據(jù)庫(kù)上的一些內(nèi)在的問(wèn)題。
在使用JTA之前,你必須首先實(shí)現(xiàn)一個(gè)Xid類用來(lái)標(biāo)識(shí)事務(wù)(在普通情況下這將由事務(wù)管理程序來(lái)處理)。Xid包含三個(gè)元素:formatID、gtrid(全局事務(wù)標(biāo)識(shí)符)和bqual(分支修飾詞標(biāo)識(shí)符)。
formatID通常是零,這意味著你將使用OSI CCR(Open Systems Interconnection Commitment, Concurrency和Recovery 標(biāo)準(zhǔn))來(lái)命名。如果你要是用另外一種格式,那么formatID應(yīng)該大于零。-1值意味著Xid為無(wú)效。
gtrid和bqual可以包含64個(gè)字節(jié)二進(jìn)制碼來(lái)分別標(biāo)識(shí)全局事務(wù)和分支事務(wù)。唯一的要求是gtrid和bqual必須是全局唯一的。此外,這可以通過(guò)使用指定在OSI CCR中的命名規(guī)則規(guī)范來(lái)完成。
下面的例子說(shuō)明Xid的實(shí)現(xiàn):
import javax.transaction.xa.*; public class MyXid implements Xid { protected int formatId; protected byte gtrid[]; protected byte bqual[]; public MyXid() { } public MyXid(int formatId, byte gtrid[], byte bqual[]) { this.formatId = formatId; this.gtrid = gtrid; this.bqual = bqual; }
public int getFormatId() { return formatId; }
public byte[] getBranchQualifier() { return bqual; }
public byte[] getGlobalTransactionId() { return gtrid; }
} |
其次,你需要?jiǎng)?chuàng)建一個(gè)你要使用的數(shù)據(jù)庫(kù)的數(shù)據(jù)源:
public DataSource getDataSource() throws SQLException { SQLServerDataSource xaDS = new com.merant.datadirect.jdbcx.sqlserver.SQLServerDataSource(); xaDS.setDataSourceName("SQLServer"); xaDS.setServerName("server"); xaDS.setPortNumber(1433); xaDS.setSelectMethod("cursor"); return xaDS; } |
例1—這個(gè)例子是用“兩步提交協(xié)議”來(lái)提交一個(gè)事務(wù)分支:
XADataSource xaDS; XAConnection xaCon; XAResource xaRes; Xid xid; Connection con; Statement stmt; int ret; xaDS = getDataSource(); xaCon = xaDS.getXAConnection("jdbc_user", "jdbc_password"); xaRes = xaCon.getXAResource(); con = xaCon.getConnection(); stmt = con.createStatement(); xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); try { xaRes.start(xid, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table values (100)"); xaRes.end(xid, XAResource.TMSUCCESS); ret = xaRes.prepare(xid); if (ret == XAResource.XA_OK) { xaRes.commit(xid, false); } } catch (XAException e) { e.printStackTrace(); } finally { stmt.close(); con.close(); xaCon.close(); } |
因?yàn)樗羞@些例子中的初始化代碼相同或者非常相似,僅僅是一些重要的地方的代碼由不同。
例2—這個(gè)例子,與例1相似,說(shuō)明了一個(gè)返回過(guò)程:
xaRes.start(xid, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table values (100)"); xaRes.end(xid, XAResource.TMSUCCESS); ret = xaRes.prepare(xid); if (ret == XAResource.XA_OK) { xaRes.rollback(xid); } |
例3—這個(gè)例子說(shuō)明一個(gè)分布式事務(wù)分支如何中止,讓相同的連接做本地事務(wù)處理,以及它們稍后該如何繼續(xù)這個(gè)分支。 分布式事務(wù)的兩步提交作用不影響本地事務(wù)。
xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); xaRes.start(xid, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table values (100)"); xaRes.end(xid, XAResource.TMSUSPEND); ∥這個(gè)更新在事務(wù)范圍之外完成,所以它不受XA返回影響。
stmt.executeUpdate("insert into test_table2 values (111)"); xaRes.start(xid, XAResource.TMRESUME); stmt.executeUpdate("insert into test_table values (200)"); xaRes.end(xid, XAResource.TMSUCCESS); ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.rollback(xid);
} |
例4—這個(gè)例子說(shuō)明一個(gè)XA資源如何分擔(dān)不同的事務(wù)。 創(chuàng)建了兩個(gè)事務(wù)分支,但是它們不屬于相同的分布式事務(wù)。 JTA允許XA資源在第一個(gè)分支上做一個(gè)兩步提交,雖然這個(gè)資源仍然與第二個(gè)分支相關(guān)聯(lián)。
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); xid2 = new MyXid(100, new byte[]{0x11}, new byte[]{0x22}); xaRes.start(xid1, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table1 values (100)"); xaRes.end(xid1, XAResource.TMSUCCESS); xaRes.start(xid2, XAResource.TMNOFLAGS); ret = xaRes.prepare(xid1); if (ret == XAResource.XA_OK) { xaRes.commit(xid2, false); } stmt.executeUpdate("insert into test_table2 values (200)"); xaRes.end(xid2, XAResource.TMSUCCESS); ret = xaRes.prepare(xid2); if (ret == XAResource.XA_OK) { xaRes.rollback(xid2); } |
例5—這個(gè)例子說(shuō)明不同的連接上的事務(wù)分支如何連接成為一個(gè)單獨(dú)的分支,如果它們連接到相同的資源管理程序。這個(gè)特點(diǎn)改善了分布式事務(wù)的效率,因?yàn)樗鼫p少了兩步提交處理的數(shù)目。兩個(gè)連接到數(shù)據(jù)庫(kù)服務(wù)器上的XA將被創(chuàng)建。每個(gè)連接創(chuàng)建它自己的XA資源,正規(guī)的JDBC連接和語(yǔ)句。在第二個(gè)XA資源開(kāi)始一個(gè)事務(wù)分支之前,它將察看是否使用和第一個(gè)XA資源使用的是同一個(gè)資源管理程序。如果這是實(shí)例,它將加入在第一個(gè)XA連接上創(chuàng)建的第一個(gè)分支,而不是創(chuàng)建一個(gè)新的分支。 稍后,這個(gè)事務(wù)分支使用XA資源來(lái)準(zhǔn)備和提交。
xaDS = getDataSource(); xaCon1 = xaDS.getXAConnection("jdbc_user", "jdbc_password"); xaRes1 = xaCon1.getXAResource(); con1 = xaCon1.getConnection(); stmt1 = con1.createStatement();
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); xaRes1.start(xid1, XAResource.TMNOFLAGS); stmt1.executeUpdate("insert into test_table1 values (100)"); xaRes1.end(xid, XAResource.TMSUCCESS); xaCon2 = xaDS.getXAConnection("jdbc_user", "jdbc_password"); xaRes2 = xaCon1.getXAResource(); con2 = xaCon1.getConnection(); stmt2 = con1.createStatement();
if (xaRes2.isSameRM(xaRes1)) { xaRes2.start(xid1, XAResource.TMJOIN); stmt2.executeUpdate("insert into test_table2 values (100)"); xaRes2.end(xid1, XAResource.TMSUCCESS); } else { xid2 = new MyXid(100, new byte[]{0x01}, new byte[]{0x03}); xaRes2.start(xid2, XAResource.TMNOFLAGS); stmt2.executeUpdate("insert into test_table2 values (100)"); xaRes2.end(xid2, XAResource.TMSUCCESS); ret = xaRes2.prepare(xid2); if (ret == XAResource.XA_OK) { xaRes2.commit(xid2, false); } } ret = xaRes1.prepare(xid1); if (ret == XAResource.XA_OK) { xaRes1.commit(xid1, false); }
|
例6—這個(gè)例子說(shuō)明在錯(cuò)誤恢復(fù)的階段,如何恢復(fù)準(zhǔn)備好的或者快要完成的事務(wù)分支。 它首先試圖返回每個(gè)分支;如果它失敗了,它嘗試著讓資源管理程序丟掉關(guān)于事務(wù)的消息。
MyXid[] xids; xids = xaRes.recover(XAResource.TMSTARTRSCAN | XAResource.TMENDRSCAN); for (int i=0; xids!=null && i try { xaRes.rollback(xids[i]); } catch (XAException ex) { try { xaRes.forget(xids[i]); } catch (XAException ex1) { System.out.println("rollback/forget failed: " + ex1.errorCode); } }
}
|