今天有人提出了一個(gè)詭異的要求,要求在全局事務(wù)中執(zhí)行多線程操作。他們?nèi)质聞?wù)中涉及兩個(gè)數(shù)據(jù)庫中的多個(gè)表,如果單線程那么走完,相應(yīng)時(shí)間上不滿足要求,說白了就是比較慢,于是提出了這樣的要求。從JTA的規(guī)范來看,transaction(TX)和thread是密切相關(guān)的,TX一般是不能在應(yīng)用線程間傳遞的, 即我主線程起一個(gè)全局事務(wù),然后我把這個(gè)事務(wù)傳遞給其他我新起的線程,單純的變量傳遞沒問題,但這個(gè)事務(wù)是不能被transaction manager(TM)識別的,TM對TX的管理有他自己的方式。從weblogic的實(shí)現(xiàn)來看,TX被放在當(dāng)前線程的threadlocal中,普通應(yīng)用線程不存在這樣的結(jié)構(gòu),所以簡單的變量傳遞,對于TM而言是沒有意義的。那么到底有沒有方法實(shí)現(xiàn)上面的需求的,我做了些測試,使用weblogic內(nèi)部的一些API可以實(shí)現(xiàn)這個(gè)需求。下面我們就來看看實(shí)現(xiàn)中的幾個(gè)要點(diǎn): :)
1:上面說了,簡單的變量傳遞對于weblogic的TM是沒有意義的。TM判斷事務(wù)上下文(transaction context)的時(shí)候,會從當(dāng)前線程的threadlocal檢查,如果沒有,則說明當(dāng)前線程沒有和任何TX關(guān)聯(lián)。那么我們?nèi)绾螌⑽覀兪掷锏腡X放入當(dāng)前線程的threadlocal呢? weblogic的ExecuteThread是我們需要的那種線程,但它是final的,我們不能繼承它,只能繼承它的父類了,也就是weblogic.kernel.AuditableThread。
2:我們有繼承了AuditableThread,那么我們怎么把TX放入它的threadlocal中呢?這個(gè)可以通過weblogic的TM實(shí)現(xiàn)中的一些API來實(shí)現(xiàn),具體到這個(gè)類就是weblogic.transaction.internal.TransactionManagerImpl。比如interResume(tx),internalSuspend()。由于這個(gè)API不是package protect的,我們自己的類必須也位于weblogic.transaction.internal這個(gè)包中。interResume(tx),用于將當(dāng)前線程和指定的TX做關(guān)聯(lián),而internalSuspend()恰恰相反,它用于解除這種關(guān)聯(lián)。
3:因?yàn)樯婕暗蕉嗑€程,主線程需要決定何時(shí)提交或回滾事務(wù),這個(gè)我們要自己要實(shí)現(xiàn)一個(gè)線程結(jié)果檢查的方法(checkCompletion())。
下面就是我自己實(shí)現(xiàn)的測試代碼,在Weblogic81測試沒有問題。
1 package weblogic.transaction.internal;
2 

3 import weblogic.transaction.TxHelper;
4 import weblogic.transaction.internal.TransactionManagerImpl;
5 import javax.transaction.Transaction;
6 import java.util.ArrayList;
7
8 public class DriverTest {
9
10 private static String INITIAL_CONTEXT_FACTORY = "weblogic.jndi.WLInitialContextFactory";
11 private static String PROVIDER_URL = "t3://localhost:8001";
12 private static String SQL_INSERT = "insert into test values(?)";
13 private static String ANO_SQL_INSERT = "insert into test1 values(?)";
14
15 public static void main(String args[])
16 {
17 DriverTest test = new DriverTest();
18 test.multiThreadXATest();
19 }
20
21 private Connection getConnection(String url, String dsName) throws NamingException, SQLException
22 {
23 InitialContext ctx = initializeEnv(url);
24 DataSource ds = (DataSource)ctx.lookup(dsName);
25 ctx.close();
26 return ds.getConnection();
27 }
28
29 private UserTransaction getUserTransaction() throws NamingException, SQLException
30 {
31 InitialContext ctx = initializeEnv(null);
32 return (UserTransaction)ctx.lookup("javax/transaction/UserTransaction");
33 }
34
35 private InitialContext initializeEnv(String url) throws NamingException
36 {
37 Properties prop = new Properties();
38 if(url == null)
39 prop.put(Context.PROVIDER_URL, PROVIDER_URL);
40 else
41 prop.put(Context.PROVIDER_URL, url);
42 prop.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
43 return new InitialContext(prop);
44 }
45
46 private void executeInsertInPSMT(Connection conn, String sql)
47 {
48 PreparedStatement pstmt = null;
49 try{
50 pstmt = conn.prepareStatement(sql);
51 pstmt.setString(1, "data_to_insert");
52 pstmt.executeUpdate();
53 pstmt.close();
54 }catch(SQLException e){
55 e.printStackTrace();
56 }
57 }
58
59 public void multiThreadXATest()
60 {
61 ArrayList result = new ArrayList();
62 try{
63 UserTransaction userTx = getUserTransaction();
64 userTx.setTransactionTimeout(1000);
65 userTx.begin();
66 Transaction tx = TxHelper.getTransaction();
67 Connection conn = getConnection("t3://localhost:8011", "TestXADS");
68 if(conn != null) conn.close();
SQLThread thread1 = new SQLThread(tx,result,"t3://localhost:8011","TestXADS", SQL_INSERT);
69 SQLThread thread2 = new SQLThread(tx,result,"t3://localhost:8021","TestXADS_1", ANO_SQL_INSERT);
70 thread1.start();
71 thread2.start();
72 while(result.size() != 2){
73 Thread.currentThread().sleep(1);
74 }
75 if(checkCompletion(result)){
76 userTx.commit();
77 }
78 else{
79 userTx.rollback();
80 }
81 }catch(Exception e){
82 e.printStackTrace();
83 }
84 }
85
86 private boolean checkCompletion(ArrayList result){
87 boolean toReturn = true;
88 for(int loop=0; loop<result.size(); loop++){
89 if((!((String)result.get(loop)).equals("OK"))){
90 toReturn = false;
91 break;
92 }
93 }
94 return toReturn;
95 }
96
97 class SQLThread extends weblogic.kernel.AuditableThread {
98
99 private Transaction tx = null;
100 private ArrayList result = null;
101 private String dsName = null;
102 private String url = null;
103 private String sql = null;
104
105 public SQLThread(Transaction tx,ArrayList result,String ds, String url, String sql){
106 this.tx = tx;
107 this.result = result;
108 this.dsName = ds;
109 this.url = url;
110 this.sql = sql;
111 }
112
113 public void run(){
114 Connection conn = null;
115 try{
116 TransactionManagerImpl tm = (TransactionManagerImpl)TransactionManagerImpl.getTransactionManager();
117 tm.internalResume((TransactionImpl)tx);
118 DriverTest test = new DriverTest();
119 conn = test.getConnection(url, dsName);
120 test.executeInsertInPSMT(conn, sql);
121 conn.close();
122 tm.internalSuspend();
123 result.add("OK");
124 }catch(Exception e){
125 result.add("NA");
126 e.printStackTrace();
127 }finally{
128 try{
129 if(conn != null)
130 conn.close();
131 }catch(Exception e){
132 e.printStackTrace();
133 }
134 }
135 }
136 }
137 }
138
139
下面是關(guān)于上面這段測試代碼的一些解釋和代碼中的限制:
1:為什么會在66行出現(xiàn)Connection conn
= getConnection("t3://localhost:8011", "TestXADS");這個(gè)看似無用的語句?Weblogic的TM實(shí)現(xiàn)中只有有XAResource參與到這個(gè)global transaction的server實(shí)例才有資格充當(dāng)這個(gè)global transaction的coordinator,其他的server實(shí)例只能充當(dāng)sub-coordinator。而且總是第一個(gè)參與全局事務(wù)的XAResource的實(shí)例充當(dāng)coordinator,因?yàn)閏oordinator的委任決定于TX開始后,第一次RMI request發(fā)送給哪個(gè)server。Connection conn
= getConnection("t3://localhost:8001", "TestXADS")用于指定這個(gè)global transaction的coordinator為8011這個(gè)server。如果沒有這個(gè)語句,thread1,thread2啟動后,它們開始XA操作時(shí),每個(gè)XAResouce都會把自己當(dāng)作這個(gè)TX的coordinator(Thread1委任8011,Thread2委任8021),這樣就會出現(xiàn)如下的異常,
javax.transaction.TransactionRolledbackException: Current server is the coordinator and transaction is not found. It was probably rolled back and forgotten already.
at weblogic.rjvm.BasicOutboundRequest.sendReceive(BasicOutboundRequest.java:108)
at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:290)
at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:247)
at weblogic.jdbc.common.internal.RmiDataSource_814_WLStub.getConnection(Unknown Source)
at weblogic.transaction.internal.DriverTest1.getConnection(DriverTest1.java:39)
at weblogic.transaction.internal.DriverTest1.access$0(DriverTest1.java:34)
at weblogic.transaction.internal.DriverTest1$SQLThread.run(DriverTest1.java:135)
2:某個(gè)全局事務(wù)中啟動的線程,不能同時(shí)操作同一個(gè)XAResource,比如Thread1操作datasource1和datasource2,thread2操作datasource2和datasource3。Weblogic中,我們做XA操作的時(shí)候,需要同后端的XA Resource Manager交互,交互中我們會多次調(diào)用xaStart(xid, flag),xaEnd(xid, flag)這里的flag可以使NOFLAGS、TMSUCESS、TMRESUME、TMSUSPEND等。如果我們在同一個(gè)全局事務(wù)的多個(gè)線程中同時(shí)操作某個(gè)RESOURCE,那么就可能我們不同線程先后給這個(gè)RESOUCE的RM發(fā)送相同的FLAG,比如xaStart(xid, TMSUSPEND),即兩個(gè)線程同時(shí)發(fā)送TMSUSPEND,這樣會引發(fā)XA_ERR,如下:
java.sql.SQLException: Unexpected exception while enlisting XAConnection java.sql.SQLException: XA error: XAER_RMERR : A resource manager error has occured in the transaction branch start() failed on resource 'TestXAPool_1': XAER_RMERR : A resource manager error has occured in the transaction branch
oracle.jdbc.xa.OracleXAException
at oracle.jdbc.xa.OracleXAResource.checkError(OracleXAResource.java:1017)
at oracle.jdbc.xa.client.OracleXAResource.start(OracleXAResource.java:227)
at weblogic.jdbc.wrapper.VendorXAResource.start(VendorXAResource.java:50)
at weblogic.jdbc.jta.DataSource.start(DataSource.java:629)
at weblogic.transaction.internal.XAServerResourceInfo.start(XAServerResourceInfo.java:1142)
at weblogic.transaction.internal.XAServerResourceInfo.xaStart(XAServerResourceInfo.java:1073)
at weblogic.transaction.internal.XAServerResourceInfo.enlist(XAServerResourceInfo.java:241)
at weblogic.transaction.internal.ServerTransactionImpl.enlistResource(ServerTransactionImpl.java:463)
at weblogic.jdbc.jta.DataSource.enlist(DataSource.java:1392)
at weblogic.jdbc.jta.DataSource.refreshXAConnAndEnlist(DataSource.java:1334)
at weblogic.jdbc.jta.DataSource.getConnection(DataSource.java:396)
at weblogic.jdbc.jta.DataSource.connect(DataSource.java:354)
at weblogic.jdbc.common.internal.RmiDataSource.getConnection(RmiDataSource.java:305)
at weblogic.jdbc.common.internal.RmiDataSource_WLSkel.invoke(Unknown Source)
......
雖然測試中沒有什么問題,但我不建議誰這么去做,畢竟我們需要遵循規(guī)范。寫這么個(gè)例子,只是讓大家對weblogic的transaction加深些理解,而不是真的要在生產(chǎn)系統(tǒng)中這樣去做。
posted on 2009-07-31 15:18
走走停停又三年 閱讀(2490)
評論(0) 編輯 收藏 所屬分類:
Weblogic