<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    走自己的路

    路漫漫其修遠兮,吾將上下而求索

      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理 ::
      50 隨筆 :: 4 文章 :: 118 評論 :: 0 Trackbacks

    背景

    應用項目組每個小時會定時的run一個存儲過程進行結算,每次執行的時間也許會超過一個小時,而且需要絕對保證存儲過程的串行執行。因為使用內存鎖不能絕對保證兩個存儲過程的串行執行,因為應用服務器down掉重啟后可能會出現并發執行的情況,因為先前的存儲過程還在db中運行。我們是使用LTS,對quartz進行了封裝來做任務調度的。我們決定鎖的管理操作由framework來實現。原因是:

    l         鎖管理器可以做成通用的模塊

    l         申請鎖,釋放鎖是比較危險的操作,擔心業務開發人員由于遺忘導致死鎖或者并發問題

    l         可以很好的集成到我們現有的framework中,方便地開放給業務開發人員使用

    注意:我們極其不推薦使用悲觀離線鎖,如果沖突出現的概率比較少,可以用其他方法比如樂觀離線鎖,DB Constraint再通過補償操作能解決的問題,請不要使用悲觀離線鎖。

     

    原理

    PLSQL UL LOCKoracle提供出來給開發人員使用的鎖資源,功能和DML鎖是類似的,當然我們可以通過DML鎖來完成并發控制,select…for update或者自己維護一張鎖表,考慮到實現代價,我們打算使用PLSQL UL LOCK。而且Oracle保證了session釋放時,UL lock都會被釋放。

    但是用它時,需要注意到它的DBMS_LOCK.Unique函數它每次都會commit數據。如果是在分布式事務當中,會拋出事務已提交的異常。因為我們使用的是XA resource并且transaction levelglobal的,也就是JTA。為了使得鎖的申請和釋放不影響分布式業務事務,或者我們是使用非xaresourcelocaltransaction來完成鎖操作,或者也可以暫停已有事務,等鎖操作完成后resume暫停的分布式事務??紤]到重用已有的xa resource我們打算使用后一種方法,其實這種方法我們也會經常使用,暫停分布式事務做DDL操作,再釋放事務。

     

    實現方法:

    l         封裝DBMS_LOCK包中的幾個存儲過程為我們所用

    l         Java端提供一個基于PLSQL UL鎖的管理器

    l         Java端定義好申請鎖,業務操作,釋放鎖的使用流程,作為一個模板

     

    DB存儲過程:

    DBMS_LOCK做了簡單的封裝,避免直接調用DBMS_LOCK。這樣做的好處是:

    l         Oracle解耦,如果其他數據庫可以提供類似的功能,我們也可以用同名的存儲過程實現

    l         方便以后對存儲過程重構,升級

    l         我們需要對DBMS_LOCK進行簡單的封裝,因為DBMS_LOCK.Unique獲取lockhandle oracle中鎖的唯一標識,輸入是lockname,邏輯名,輸出是鎖的唯一標識,對java端應該是透明的,java端應該只關心鎖的邏輯名。

    create or replace package body frm_lts_processor_lock_pkg is
      
    /* table to store lockhandles by name */
       TYPE handle_tbltype IS TABLE OF varchar2(
    128)
          INDEX BY varchar2(
    128);

       v_lockhandle_tbl handle_tbltype;
      
     procedure frm_lts_lock_acquire(i_lock_name in varchar2, i_expiration_time in Integer default
    864000, i_wait_time in Integer default DBMS_LOCK.maxwait, o_result out number) as
          v_result     number;
          v_lockhandle varchar2(
    128);
       begin
       if v_lockhandle_tbl.count =
    0 then
          sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
          v_lockhandle_tbl(i_lock_name) := v_lockhandle;
       elsif v_lockhandle_tbl.exists(i_lock_name) then
          dbms_output.put_line(
    'atttacked');
          v_lockhandle := v_lockhandle_tbl(i_lock_name);
       else
         dbms_output.put_line(
    'new acquire');
         
    --acquire a unique lock id
         sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
         v_lockhandle_tbl(i_lock_name) := v_lockhandle;
           
       end if;
       
    --acquire a lock
        v_result := sys.dbms_lock.request(v_lockhandle, dbms_lock.x_mode, i_wait_time, false);
     
       
    --set return values
         o_result := v_result;
     end frm_lts_lock_acquire;
     
     function frm_lts_lock_release(i_lock_name in varchar2) return number as
           v_result number :=
    6;
           v_lockhandle varchar2(
    128);
     begin
         
    --release lock according to lockhandle
          if v_lockhandle_tbl.exists(i_lock_name) then
              v_lockhandle :=  v_lockhandle_tbl(i_lock_name);
              v_result := sys.dbms_lock.release(v_lockhandle);
              v_lockhandle_tbl.delete(i_lock_name);       
          end if;
          return v_result;
     end frm_lts_lock_release;

    end frm_lts_processor_lock_pkg;
    /

     

     

    鎖管理器:

    其實應用項目組有多個這樣的存儲過程,而這些存儲過程之間的串行執行可以有多個business key來決定的,比如job order number,delivery order等。所以我們需要給他們提供多鎖管理機制。我們會對這多個鎖進行排序,以避免死鎖,并強烈推薦應用項目設置超時時間。這些business key是由String對象構成的,為了防止大量的業務操作被鎖在null或者空string這樣沒有意義的business key上面,我們對application提供的鎖集合還需要進行過濾。

    原理還是很簡單的,就是在本地事務中調用db端的申請鎖,釋放鎖的存儲過程,然后對返回的結果進行一系列處理。

    在使用多鎖機制的時候要保證,如果只申請到了部分鎖,在申請其中另外一個鎖時發生了錯誤或者超時,要能夠安全地將已申請的鎖釋放掉,所以多鎖申請需要記錄已申請到的鎖,并且記錄發生的錯誤,區分timeout和異常。Timeout返回false,如果出現異常記錄下來,最后拋出。釋放多鎖時,不能被中斷,記錄釋放每個鎖后的結果,最后判定如果其中一些鎖釋放時發生了錯誤,拋出。

     

    handleLock定義暫停jta事務,執行鎖操作,釋放jta事務流程

    private Object handleLock(Connection connection,
                LocalTransactionCallback localTransactionCallback)
                
    throws LockException {
            TransactionManager tm 
    = null;
            Transaction currentTx 
    = null;
            Object result 
    = null;
            
    try {
                Context initialContext 
    = new InitialContext();
                UserTransaction userTrx 
    = (javax.transaction.UserTransaction) initialContext
                        .lookup(
    "java:comp/UserTransaction");
                
    if (!(userTrx.getStatus() == Status.STATUS_NO_TRANSACTION)) {
                    tm 
    = TransactionUtils.getTransactionManager(userTrx);
                    
    if (tm != null{
                        currentTx 
    = tm.suspend();
                    }

                }

                result 
    = localTransactionCallback
                        .executeInLocalTransaction(connection);

                
    if (null != currentTx) {
                    tm.resume(currentTx);
                }

            }
     catch (NamingException e) {
            }
     catch (SystemException e) {
            }
     catch (InvalidTransactionException e) {
            }
     catch (IllegalStateException e) {
            }

            
    return result;
        }


    多鎖申請操作是上面流程的一個回調

    private class ObtainMutipleLocksLocalTransactionCallback implements
                LocalTransactionCallback 
    {
            
    private Set<String> lockNames;
            
    private int waitTime;

            ObtainMutipleLocksLocalTransactionCallback(Set
    <String> lockNames,
                    
    int waitTime) {
                
    this.lockNames = lockNames;
                
    this.waitTime = waitTime;
            }

            
    public Object executeInLocalTransaction(Connection conn) {
                CallableStatement lockAcquireStmt 
    = null;
                Set
    <String> obtainedLockNames = new HashSet<String>();
                
    boolean timeOut = false;
                String timeOutLockName 
    = null;
                Exception mifLockException 
    = null;
                
    try {
                    lockAcquireStmt 
    = conn.prepareCall(OBTAIN_LOCK_PROC_CALL);
                    
    for (String lockName : lockNames) {
                        lockAcquireStmt.setString(
    1, lockName);
                        lockAcquireStmt.setInt(
    2, LCOK_EXPIRE_TIME);
                        lockAcquireStmt.setInt(
    3, waitTime);
                        lockAcquireStmt.registerOutParameter(
    4,
                                java.sql.Types.INTEGER);
                        lockAcquireStmt.registerOutParameter(
    5,
                                java.sql.Types.VARCHAR);
                        lockAcquireStmt.executeUpdate();
                        
    int lockacquireResult = lockAcquireStmt.getInt(4);
                        
    if (lockacquireResult == ULLockResultType.SUCCESSFUL) 
                            obtainedLockNames.add(lockName);
                        }
     else if (lockacquireResult == ULLockResultType.TIMEOUT) {
                            timeOut 
    = true;
                            timeOutLockName 
    = lockName;
                            
    break;
                        }
     else if (lockacquireResult != ULLockResultType.ALREADY_OWNED) {
                            String lockResultDesc 
    = ULLockResultType
                                    .getAcquireTypeDesc(lockacquireResult);
                            LockException lockException 
    = new LockException(
                                    
    "Obtain lock " + lockName
                                            
    + " fails, the reason is "
                                            
    + lockResultDesc + " .");
                            lockException.setLockName(lockName);
                            lockException.setLockHandlingResult(lockResultDesc);
                            
    throw lockException;
                        }
     else {
                        }

                    }

                }
     catch (Exception ex) {
                    mifLockException 
    = ex;
                }
     finally {
                    
    if (null != lockAcquireStmt) {
                        
    try {
                            lockAcquireStmt.close();
                        }
     catch (SQLException e) {
                            
    // swallow
                        }

                    }

                }

                
    boolean success = true;
                
    if (timeOut || mifLockException != null{
                    success 
    = false;
                }

                
    return new ObtainMultipleLocksResult(success, obtainedLockNames,
                        timeOut, timeOutLockName, mifLockException);
            }

        }


    多鎖釋放操作也是事務暫停流程的一個回調

    private class ReleaseMultipleLocksLocalTransactionCallback implements
                LocalTransactionCallback 
    {
            
    private Set<String> lockNames;

            ReleaseMultipleLocksLocalTransactionCallback(Set
    <String> lockNames) {
                
    this.lockNames = lockNames;
            }


            
    public Object executeInLocalTransaction(Connection conn) {
                CallableStatement lockReleaseStmt 
    = null;
                Map
    <String, Exception> mifLockErrors = new HashMap<String, Exception>();
                Set
    <String> releasedLocks = new HashSet<String>();
                
    try {
                    
    try {
                        lockReleaseStmt 
    = conn.prepareCall(RELEASE_LOCK_PROC_CALL);
                    }
     catch (Exception ex) {
                        
    for (String lockName : lockNames) {
                            mifLockErrors.put(lockName, ex);
                        }

    return new ReleaseMutipleLocksResult(false, releasedLocks, mifLockErrors);
                    }


                    
    for (String lockName : lockNames) {
                        
    try {
                            lockReleaseStmt.registerOutParameter(
    1,
                                    java.sql.Types.INTEGER);
                            lockReleaseStmt.setString(
    2, lockName);
                            lockReleaseStmt.executeUpdate();
                            
    int lockReleaseResult = lockReleaseStmt.getInt(1);
                            
    if (lockReleaseResult == ULLockResultType.SUCCESSFUL) {
                                releasedLocks.add(lockName);
                            }
     else {
                                String lockResultDesc 
    = ULLockResultType
                                        .getReleaseTypeDesc(lockReleaseResult);
                                LockException lockException 
    = new LockException(
                                        
    "Release lock " + lockName
                                                
    + " fails, the reason is "
                                                
    + lockResultDesc + " .");
                                lockException.setLockName(lockName);
                                lockException.setLockHandlingResult(lockResultDesc);
                                mifLockErrors.put(lockName, lockException);
                            }

                        }
     catch (Exception ex) {
                            mifLockErrors.put(lockName, ex);
                        }

                    }

                }
     finally {
                    
    if (null != lockReleaseStmt) {
                        
    try {
                            lockReleaseStmt.close();
                        }
     catch (SQLException e) {
                        }

                    }

                }

                
    boolean success = releasedLocks.size() == this.lockNames.size();
                
    return new ReleaseMutipleLocksResult(success, releasedLocks,
                        mifLockErrors);
            }

        }


    使用模板:注意鎖的釋放要寫在finally語句塊里面,保證鎖的釋放。

    定義好模板,防止Application用戶直接調用鎖管理器或者濫用鎖,忘記釋放鎖。我們決定定義一個模板,做到鎖的申請和釋放對application用戶來說是透明的,把它做成了隱含鎖。

    public void execute(JobExecutionContext context)
                
    throws JobExecutionException {
            Map jobDataMap 
    = context
            .getJobDetail().getJobDataMap();
            Collection
    <String> lockKeys = (Collection<String>) jobDataMap.get(LOCK_NAME_KEY);
            Integer waitTimeInteger 
    = (Integer) jobDataMap
            .get(LOCK_WAIT_TIME_SECONDS_KEY);
            
    int waitTime = MAX_WAITTIME;
            
    if (waitTimeInteger != null{
                waitTime 
    = waitTimeInteger.intValue();
            }

            Set
    <String> uniqueLockKeys = new HashSet<String>(lockKeys);

            
    // filter empty keys
            Iterator<String> keyIterator = uniqueLockKeys.iterator();
            
    while (keyIterator.hasNext()) {
                String key 
    = keyIterator.next();
                
    if (StringUtils.isEmptyNoOffset(key)) {
                    keyIterator.remove();
                }

            }

            
    if (CollectionUtils.isNotEmptyCollection(uniqueLockKeys)) {
                Set
    <String> obtainedLockNames = null;
                Connection connection 
    = null;
                
    try {
                    connection 
    = DataSource.getConnection();
                    ObtainMultipleLocksResult result 
    = LOCK_MANAGER.obtainLock(
                            connection, uniqueLockKeys, waitTime);
                    obtainedLockNames 
    = result.getObtainedLockNames();
                    
    if (!result.isSuccess()) {
                        
    if (result.isTimeout()) {
                          
    //do log
                            return;
                        }
     else {
                            JobExecutionException jobException 
    = new JobExecutionException(
                                    
    "Obtain locks failed! "
                                            
    + result.getMifLockException()
                                                    .getMessage(), result
                                            .getMifLockException());
                            
    throw jobException;
                        }

                    }

                    
    this. executeInLock (context);
                }
     catch (Throwable e) {
                    
    throw new JobExecutionException(
                            
    "Get db connection failed!" + e.getMessage(), e);
                }
     finally {
                    
    if (null != connection) {
                        
    this.releaseLocks(connection, obtainedLockNames);
                        
    try {
                            connection.close();
                        }
     catch (SQLException e) {
                            
    throw new JobExecutionException(
                                    
    "close  db connection failed!" + e.getMessage(), e);
                        }

                    }

                }

            }
     else {
                
    this.executeInLock(context);
            }

        }


     

    executeInLockapplication的子類繼承實現

     

    緩存

    l         緩存悲觀離線鎖

    l         緩存lockhandle

    因為使用的是悲觀離線鎖,每次申請鎖都要跑一趟db,但如果當前線程已經是lock的所有者就不需要白跑一趟了??梢杂?/span>ThreadLocal把當前線程已經擁有的鎖緩存起來,釋放鎖時對應的需要清除緩存。

    在申請鎖時,需要獲得UL Lock時的lockhandle,釋放鎖時也需要提供鎖的lockhandle,我們需要將它緩存起來,主要是因為DBMS_LOCK.Unique每次都會commit,會影響性能,這樣每次釋放鎖時就可以直接使用lockhandle了。有兩種方法對lockhandle進行緩存,緩存在java端作為實例變量,緩存在plsql包的全局變量中。緩存在java端需要注意的是,lock manager不能作為單例或者享元來使用,否則lock handle的緩存在多jvm之間也存在著并發控制和同步的問題。

    源代碼:

    Java:
    ULLock-sources.rar

    PLSQL:
    lockplsql.rar

    參考:

    http://docstore.mik.ua/orelly/oracle/bipack/ch04_01.htm




    posted on 2009-07-03 19:24 叱咤紅人 閱讀(1502) 評論(0)  編輯  收藏

    只有注冊用戶登錄后才能發表評論。


    網站導航:
     
    主站蜘蛛池模板: 国产精品国产免费无码专区不卡 | 91福利视频免费观看| 4480yy私人影院亚洲| 日韩视频在线精品视频免费观看| 亚洲一级大黄大色毛片| 青青青国产免费一夜七次郎| 一道本在线免费视频| 亚洲av无码精品网站| 成年人视频免费在线观看| 亚洲色中文字幕在线播放| 国产精品亚洲精品日韩已方| 99爱在线精品视频免费观看9| 亚洲中文字幕无码爆乳app| MM131亚洲国产美女久久| 7x7x7x免费在线观看| 免费一级全黄少妇性色生活片| 亚洲AV无码国产丝袜在线观看 | 亚洲国产综合无码一区| 四虎最新永久免费视频| 国产亚洲精品第一综合| 亚洲成a人片77777老司机| 午夜a级成人免费毛片| 免费女人高潮流视频在线观看 | 国产亚洲精品第一综合| 亚洲综合无码一区二区| 国产a级特黄的片子视频免费| 久久久高清日本道免费观看| 99亚洲男女激情在线观看| 亚洲网址在线观看| 亚洲精品乱码久久久久久蜜桃| 免费精品国产自产拍在线观看图片| 一区二区三区免费看| 亚洲最大无码中文字幕| 亚洲Aⅴ无码专区在线观看q| 日本免费人成视频播放| 久久精品一本到99热免费| 无码毛片一区二区三区视频免费播放| 亚洲人xxx日本人18| 久久夜色精品国产噜噜噜亚洲AV| 亚洲精品国产精品乱码不卡| 好吊妞在线成人免费|