引自:
http://act.it.sohu.com/book/chapter.php?id=387&volume=4&chapter=3JDBC驅動程序和XAResource 為了簡化XAResource的說明,這些例子說明了一個應用程序在不包含應用程序服務器和事項管理程序的情況下應該如何使用JTA。 基本上,這些例子中的應用程序也擔任應用程序服務器和事項管理程序的任務。大部分的企業使用事務管理程序和應用程序服務器,因為它們能夠比一個應用程序更能夠高效地管理分布式事務。 然而遵循這些例子,一個應用程序開發者可以測試在JDBC驅動程序中的JTA支持的健壯性。而且有一些例子可能不是工作在某個特定的數據庫上,這是因為關聯在數據庫上的一些內在的問題。
在使用JTA之前,你必須首先實現一個Xid類用來標識事務(在普通情況下這將由事務管理程序來處理)。Xid包含三個元素:formatID、gtrid(全局事務標識符)和bqual(分支修飾詞標識符)。
formatID通常是零,這意味著你將使用OSI CCR(Open Systems Interconnection Commitment, Concurrency和Recovery 標準)來命名。如果你要是用另外一種格式,那么formatID應該大于零。-1值意味著Xid為無效。
gtrid和bqual可以包含64個字節二進制碼來分別標識全局事務和分支事務。唯一的要求是gtrid和bqual必須是全局唯一的。此外,這可以通過使用指定在OSI CCR中的命名規則規范來完成。
下面的例子說明Xid的實現:
import javax.transaction.xa.*; public class MyXid implements Xid { protected int formatId; protected byte gtrid[]; protected byte bqual[]; public MyXid() { } public MyXid(int formatId, byte gtrid[], byte bqual[]) { this.formatId = formatId; this.gtrid = gtrid; this.bqual = bqual; }
public int getFormatId() { return formatId; }
public byte[] getBranchQualifier() { return bqual; }
public byte[] getGlobalTransactionId() { return gtrid; }
} |
其次,你需要創建一個你要使用的數據庫的數據源:
public DataSource getDataSource() throws SQLException { SQLServerDataSource xaDS = new com.merant.datadirect.jdbcx.sqlserver.SQLServerDataSource(); xaDS.setDataSourceName("SQLServer"); xaDS.setServerName("server"); xaDS.setPortNumber(1433); xaDS.setSelectMethod("cursor"); return xaDS; } |
例1—這個例子是用“兩步提交協議”來提交一個事務分支:
XADataSource xaDS; XAConnection xaCon; XAResource xaRes; Xid xid; Connection con; Statement stmt; int ret; xaDS = getDataSource(); xaCon = xaDS.getXAConnection("jdbc_user", "jdbc_password"); xaRes = xaCon.getXAResource(); con = xaCon.getConnection(); stmt = con.createStatement(); xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); try { xaRes.start(xid, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table values (100)"); xaRes.end(xid, XAResource.TMSUCCESS); ret = xaRes.prepare(xid); if (ret == XAResource.XA_OK) { xaRes.commit(xid, false); } } catch (XAException e) { e.printStackTrace(); } finally { stmt.close(); con.close(); xaCon.close(); } |
因為所有這些例子中的初始化代碼相同或者非常相似,僅僅是一些重要的地方的代碼由不同。
例2—這個例子,與例1相似,說明了一個返回過程:
xaRes.start(xid, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table values (100)"); xaRes.end(xid, XAResource.TMSUCCESS); ret = xaRes.prepare(xid); if (ret == XAResource.XA_OK) { xaRes.rollback(xid); } |
例3—這個例子說明一個分布式事務分支如何中止,讓相同的連接做本地事務處理,以及它們稍后該如何繼續這個分支。 分布式事務的兩步提交作用不影響本地事務。
xid = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); xaRes.start(xid, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table values (100)"); xaRes.end(xid, XAResource.TMSUSPEND); ∥這個更新在事務范圍之外完成,所以它不受XA返回影響。
stmt.executeUpdate("insert into test_table2 values (111)"); xaRes.start(xid, XAResource.TMRESUME); stmt.executeUpdate("insert into test_table values (200)"); xaRes.end(xid, XAResource.TMSUCCESS); ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.rollback(xid);
} |
例4—這個例子說明一個XA資源如何分擔不同的事務。 創建了兩個事務分支,但是它們不屬于相同的分布式事務。 JTA允許XA資源在第一個分支上做一個兩步提交,雖然這個資源仍然與第二個分支相關聯。
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); xid2 = new MyXid(100, new byte[]{0x11}, new byte[]{0x22}); xaRes.start(xid1, XAResource.TMNOFLAGS); stmt.executeUpdate("insert into test_table1 values (100)"); xaRes.end(xid1, XAResource.TMSUCCESS); xaRes.start(xid2, XAResource.TMNOFLAGS); ret = xaRes.prepare(xid1); if (ret == XAResource.XA_OK) { xaRes.commit(xid2, false); } stmt.executeUpdate("insert into test_table2 values (200)"); xaRes.end(xid2, XAResource.TMSUCCESS); ret = xaRes.prepare(xid2); if (ret == XAResource.XA_OK) { xaRes.rollback(xid2); } |
例5—這個例子說明不同的連接上的事務分支如何連接成為一個單獨的分支,如果它們連接到相同的資源管理程序。這個特點改善了分布式事務的效率,因為它減少了兩步提交處理的數目。兩個連接到數據庫服務器上的XA將被創建。每個連接創建它自己的XA資源,正規的JDBC連接和語句。在第二個XA資源開始一個事務分支之前,它將察看是否使用和第一個XA資源使用的是同一個資源管理程序。如果這是實例,它將加入在第一個XA連接上創建的第一個分支,而不是創建一個新的分支。 稍后,這個事務分支使用XA資源來準備和提交。
xaDS = getDataSource(); xaCon1 = xaDS.getXAConnection("jdbc_user", "jdbc_password"); xaRes1 = xaCon1.getXAResource(); con1 = xaCon1.getConnection(); stmt1 = con1.createStatement();
xid1 = new MyXid(100, new byte[]{0x01}, new byte[]{0x02}); xaRes1.start(xid1, XAResource.TMNOFLAGS); stmt1.executeUpdate("insert into test_table1 values (100)"); xaRes1.end(xid, XAResource.TMSUCCESS); xaCon2 = xaDS.getXAConnection("jdbc_user", "jdbc_password"); xaRes2 = xaCon1.getXAResource(); con2 = xaCon1.getConnection(); stmt2 = con1.createStatement();
if (xaRes2.isSameRM(xaRes1)) { xaRes2.start(xid1, XAResource.TMJOIN); stmt2.executeUpdate("insert into test_table2 values (100)"); xaRes2.end(xid1, XAResource.TMSUCCESS); } else { xid2 = new MyXid(100, new byte[]{0x01}, new byte[]{0x03}); xaRes2.start(xid2, XAResource.TMNOFLAGS); stmt2.executeUpdate("insert into test_table2 values (100)"); xaRes2.end(xid2, XAResource.TMSUCCESS); ret = xaRes2.prepare(xid2); if (ret == XAResource.XA_OK) { xaRes2.commit(xid2, false); } } ret = xaRes1.prepare(xid1); if (ret == XAResource.XA_OK) { xaRes1.commit(xid1, false); }
|
例6—這個例子說明在錯誤恢復的階段,如何恢復準備好的或者快要完成的事務分支。 它首先試圖返回每個分支;如果它失敗了,它嘗試著讓資源管理程序丟掉關于事務的消息。
MyXid[] xids; xids = xaRes.recover(XAResource.TMSTARTRSCAN | XAResource.TMENDRSCAN); for (int i=0; xids!=null && i try { xaRes.rollback(xids[i]); } catch (XAException ex) { try { xaRes.forget(xids[i]); } catch (XAException ex1) { System.out.println("rollback/forget failed: " + ex1.errorCode); } }
}
|