<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 42,comments - 83,trackbacks - 0
            今天有人提出了一個(gè)詭異的要求,要求在全局事務(wù)中執(zhí)行多線程操作。他們?nèi)质聞?wù)中涉及兩個(gè)數(shù)據(jù)庫中的多個(gè)表,如果單線程那么走完,相應(yīng)時(shí)間上不滿足要求,說白了就是比較慢,于是提出了這樣的要求。從JTA的規(guī)范來看,transaction(TX)和thread是密切相關(guān)的,TX一般是不能在應(yīng)用線程間傳遞的, 即我主線程起一個(gè)全局事務(wù),然后我把這個(gè)事務(wù)傳遞給其他我新起的線程,單純的變量傳遞沒問題,但這個(gè)事務(wù)是不能被transaction manager(TM)識別的,TM對TX的管理有他自己的方式。從weblogic的實(shí)現(xiàn)來看,TX被放在當(dāng)前線程的threadlocal中,普通應(yīng)用線程不存在這樣的結(jié)構(gòu),所以簡單的變量傳遞,對于TM而言是沒有意義的。那么到底有沒有方法實(shí)現(xiàn)上面的需求的,我做了些測試,使用weblogic內(nèi)部的一些API可以實(shí)現(xiàn)這個(gè)需求。下面我們就來看看實(shí)現(xiàn)中的幾個(gè)要點(diǎn): :)

            1:上面說了,簡單的變量傳遞對于weblogic的TM是沒有意義的。TM判斷事務(wù)上下文(transaction context)的時(shí)候,會從當(dāng)前線程的threadlocal檢查,如果沒有,則說明當(dāng)前線程沒有和任何TX關(guān)聯(lián)。那么我們?nèi)绾螌⑽覀兪掷锏腡X放入當(dāng)前線程的threadlocal呢? weblogic的ExecuteThread是我們需要的那種線程,但它是final的,我們不能繼承它,只能繼承它的父類了,也就是weblogic.kernel.AuditableThread。

            2:我們有繼承了AuditableThread,那么我們怎么把TX放入它的threadlocal中呢?這個(gè)可以通過weblogic的TM實(shí)現(xiàn)中的一些API來實(shí)現(xiàn),具體到這個(gè)類就是weblogic.transaction.internal.TransactionManagerImpl。比如interResume(tx),internalSuspend()。由于這個(gè)API不是package protect的,我們自己的類必須也位于weblogic.transaction.internal這個(gè)包中。interResume(tx),用于將當(dāng)前線程和指定的TX做關(guān)聯(lián),而internalSuspend()恰恰相反,它用于解除這種關(guān)聯(lián)。

            3:因?yàn)樯婕暗蕉嗑€程,主線程需要決定何時(shí)提交或回滾事務(wù),這個(gè)我們要自己要實(shí)現(xiàn)一個(gè)線程結(jié)果檢查的方法(checkCompletion())。

             下面就是我自己實(shí)現(xiàn)的測試代碼,在Weblogic81測試沒有問題。
      1 package weblogic.transaction.internal;
      2 
      3 import weblogic.transaction.TxHelper;
      4 import weblogic.transaction.internal.TransactionManagerImpl;
      5 import javax.transaction.Transaction;
      6 import java.util.ArrayList;
      7 
      8 public class DriverTest {
      9 
     10     private static String INITIAL_CONTEXT_FACTORY = "weblogic.jndi.WLInitialContextFactory"
     11     private static String PROVIDER_URL = "t3://localhost:8001";  
     12     private static String SQL_INSERT = "insert into test values(?)";
     13     private static String ANO_SQL_INSERT = "insert into test1 values(?)";
     14     
     15     public static void main(String args[])
     16     {
     17         DriverTest test = new DriverTest();
     18         test.multiThreadXATest();
     19     }
     20     
     21     private Connection getConnection(String url, String dsName) throws NamingException, SQLException
     22     {
     23         InitialContext ctx = initializeEnv(url);
     24         DataSource ds = (DataSource)ctx.lookup(dsName);
     25         ctx.close();
     26         return ds.getConnection();
     27     }
     28     
     29     private UserTransaction getUserTransaction() throws NamingException, SQLException
     30     {
     31         InitialContext ctx = initializeEnv(null);
     32         return (UserTransaction)ctx.lookup("javax/transaction/UserTransaction");
     33     }
     34     
     35     private InitialContext initializeEnv(String url) throws NamingException
     36     {
     37         Properties prop = new Properties();
     38         if(url == null)
     39             prop.put(Context.PROVIDER_URL, PROVIDER_URL);
     40         else
     41             prop.put(Context.PROVIDER_URL, url);
     42         prop.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
     43         return new InitialContext(prop);
     44     }
     45     
     46     private void executeInsertInPSMT(Connection conn, String sql)
     47     {
     48         PreparedStatement pstmt = null;
     49         try{
     50             pstmt = conn.prepareStatement(sql);
     51             pstmt.setString(1"data_to_insert");
     52             pstmt.executeUpdate();
     53             pstmt.close();
     54         }catch(SQLException e){
     55                 e.printStackTrace();
     56         }    
     57     }
     58     
     59     public void multiThreadXATest()
     60     {
     61         ArrayList result = new ArrayList();
     62         try{
     63             UserTransaction userTx = getUserTransaction();
     64             userTx.setTransactionTimeout(1000);
     65             userTx.begin();
     66             Transaction tx = TxHelper.getTransaction();
     67             Connection conn = getConnection("t3://localhost:8011""TestXADS");
     68             if(conn != null) conn.close();
                      SQLThread thread1 
    = new SQLThread(tx,result,"t3://localhost:8011","TestXADS", SQL_INSERT);
     69             SQLThread thread2 = new SQLThread(tx,result,"t3://localhost:8021","TestXADS_1", ANO_SQL_INSERT);
     70             thread1.start();
     71             thread2.start();
     72             while(result.size() != 2){
     73                 Thread.currentThread().sleep(1);
     74             }
     75             if(checkCompletion(result)){
     76                 userTx.commit();
     77             }
     78             else{
     79                 userTx.rollback();
     80             }
     81         }catch(Exception e){
     82             e.printStackTrace();
     83         }
     84     }
     85     
     86     private boolean checkCompletion(ArrayList result){
     87         boolean toReturn = true;
     88         for(int loop=0; loop<result.size(); loop++){
     89             if((!((String)result.get(loop)).equals("OK"))){
     90                 toReturn = false;
     91                 break;
     92             }
     93         }
     94         return toReturn;
     95     }
     96     
     97     class SQLThread extends weblogic.kernel.AuditableThread {
     98         
     99         private Transaction tx = null;
    100         private ArrayList result = null;
    101         private String dsName = null;
    102         private String url = null;
    103         private String sql = null;
    104         
    105         public SQLThread(Transaction tx,ArrayList result,String ds, String url, String sql){
    106             this.tx = tx;
    107             this.result = result;
    108             this.dsName = ds;
    109             this.url = url;
    110             this.sql = sql;
    111         }
    112         
    113         public void run(){
    114             Connection conn = null;
    115             try{
    116                 TransactionManagerImpl tm = (TransactionManagerImpl)TransactionManagerImpl.getTransactionManager();
    117                 tm.internalResume((TransactionImpl)tx);
    118                 DriverTest test = new DriverTest();
    119                 conn = test.getConnection(url, dsName);
    120                 test.executeInsertInPSMT(conn, sql);
    121                 conn.close();
    122                 tm.internalSuspend();
    123                 result.add("OK");
    124             }catch(Exception e){
    125                 result.add("NA");
    126                 e.printStackTrace();
    127             }finally{
    128                 try{
    129                     if(conn != null)
    130                         conn.close();
    131                 }catch(Exception e){
    132                     e.printStackTrace();
    133                 }
    134             }
    135         }
    136     }
    137 }
    138 
    139 

            下面是關(guān)于上面這段測試代碼的一些解釋和代碼中的限制:
             1:為什么會在66行出現(xiàn)Connection conn = getConnection("t3://localhost:8011""TestXADS");這個(gè)看似無用的語句?Weblogic的TM實(shí)現(xiàn)中只有有XAResource參與到這個(gè)global transaction的server實(shí)例才有資格充當(dāng)這個(gè)global transaction的coordinator,其他的server實(shí)例只能充當(dāng)sub-coordinator。而且總是第一個(gè)參與全局事務(wù)的XAResource的實(shí)例充當(dāng)coordinator,因?yàn)閏oordinator的委任決定于TX開始后,第一次RMI request發(fā)送給哪個(gè)server。Connection conn = getConnection("t3://localhost:8001""TestXADS")用于指定這個(gè)global transaction的coordinator為8011這個(gè)server。如果沒有這個(gè)語句,thread1,thread2啟動后,它們開始XA操作時(shí),每個(gè)XAResouce都會把自己當(dāng)作這個(gè)TX的coordinator(Thread1委任8011,Thread2委任8021),這樣就會出現(xiàn)如下的異常,
            
    javax.transaction.TransactionRolledbackException: Current server is the coordinator and transaction is not found.  It was probably rolled back and forgotten already.
     at weblogic.rjvm.BasicOutboundRequest.sendReceive(BasicOutboundRequest.java:108)
     at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:290)
     at weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke(ReplicaAwareRemoteRef.java:247)
     at weblogic.jdbc.common.internal.RmiDataSource_814_WLStub.getConnection(Unknown Source)
     at weblogic.transaction.internal.DriverTest1.getConnection(DriverTest1.java:39)
     at weblogic.transaction.internal.DriverTest1.access$0(DriverTest1.java:34)
     at weblogic.transaction.internal.DriverTest1$SQLThread.run(DriverTest1.java:135)

            2:某個(gè)全局事務(wù)中啟動的線程,不能同時(shí)操作同一個(gè)XAResource,比如Thread1操作datasource1和datasource2,thread2操作datasource2和datasource3。Weblogic中,我們做XA操作的時(shí)候,需要同后端的XA Resource Manager交互,交互中我們會多次調(diào)用xaStart(xid, flag),xaEnd(xid, flag)這里的flag可以使NOFLAGS、TMSUCESS、TMRESUME、TMSUSPEND等。如果我們在同一個(gè)全局事務(wù)的多個(gè)線程中同時(shí)操作某個(gè)RESOURCE,那么就可能我們不同線程先后給這個(gè)RESOUCE的RM發(fā)送相同的FLAG,比如xaStart(xid, TMSUSPEND),即兩個(gè)線程同時(shí)發(fā)送TMSUSPEND,這樣會引發(fā)XA_ERR,如下:

    java.sql.SQLException: Unexpected exception while enlisting XAConnection java.sql.SQLException: XA error: XAER_RMERR : A resource manager error has occured in the transaction branch start() failed on resource 'TestXAPool_1': XAER_RMERR : A resource manager error has occured in the transaction branch
    oracle.jdbc.xa.OracleXAException
     at oracle.jdbc.xa.OracleXAResource.checkError(OracleXAResource.java:1017)
     at oracle.jdbc.xa.client.OracleXAResource.start(OracleXAResource.java:227)
     at weblogic.jdbc.wrapper.VendorXAResource.start(VendorXAResource.java:50)
     at weblogic.jdbc.jta.DataSource.start(DataSource.java:629)
     at weblogic.transaction.internal.XAServerResourceInfo.start(XAServerResourceInfo.java:1142)
     at weblogic.transaction.internal.XAServerResourceInfo.xaStart(XAServerResourceInfo.java:1073)
     at weblogic.transaction.internal.XAServerResourceInfo.enlist(XAServerResourceInfo.java:241)
     at weblogic.transaction.internal.ServerTransactionImpl.enlistResource(ServerTransactionImpl.java:463)
     at weblogic.jdbc.jta.DataSource.enlist(DataSource.java:1392)
     at weblogic.jdbc.jta.DataSource.refreshXAConnAndEnlist(DataSource.java:1334)
     at weblogic.jdbc.jta.DataSource.getConnection(DataSource.java:396)
     at weblogic.jdbc.jta.DataSource.connect(DataSource.java:354)
     at weblogic.jdbc.common.internal.RmiDataSource.getConnection(RmiDataSource.java:305)
     at weblogic.jdbc.common.internal.RmiDataSource_WLSkel.invoke(Unknown Source)
     ......

            雖然測試中沒有什么問題,但我不建議誰這么去做,畢竟我們需要遵循規(guī)范。寫這么個(gè)例子,只是讓大家對weblogic的transaction加深些理解,而不是真的要在生產(chǎn)系統(tǒng)中這樣去做。
    posted on 2009-07-31 15:18 走走停停又三年 閱讀(2490) 評論(0)  編輯  收藏 所屬分類: Weblogic
    主站蜘蛛池模板: 国产亚洲av人片在线观看| a级片在线免费看| 亚洲经典在线中文字幕| 亚洲精品无码专区久久同性男| 最新仑乱免费视频| 亚洲免费视频网站| 三上悠亚在线观看免费| 免费无码又爽又黄又刺激网站| 亚洲一区二区三区乱码在线欧洲| 亚洲美女中文字幕| 亚洲精品夜夜夜妓女网| 亚洲AV无码专区日韩| 女人让男人免费桶爽30分钟| 希望影院高清免费观看视频| 久久久久久AV无码免费网站下载| av电影在线免费看| 美女黄色免费网站| 国产精品亚洲片在线花蝴蝶| 亚洲乱人伦中文字幕无码| 亚洲一区二区三区在线网站| 亚洲第一香蕉视频| 亚洲精品福利网泷泽萝拉| 亚洲国产日韩一区高清在线 | 亚洲国产精品综合久久20| 久久夜色精品国产噜噜噜亚洲AV| 亚洲日韩欧洲无码av夜夜摸| 亚洲熟伦熟女新五十路熟妇| 免费国产一级特黄久久| 亚洲Av无码乱码在线观看性色| 四虎亚洲国产成人久久精品| 精品无码一区二区三区亚洲桃色 | 无码天堂亚洲国产AV| 亚洲av无码成人精品国产 | 午夜亚洲国产成人不卡在线| 国产成人3p视频免费观看| 永久免费看mv网站入口| 成人永久免费福利视频网站| 国产v片免费播放| 亚洲国产婷婷综合在线精品| 国产成人A亚洲精V品无码| 亚洲国产精品国自产拍AV|