背景
應用項目組每個小時會定時的run一個存儲過程進行結算,每次執行的時間也許會超過一個小時,而且需要絕對保證存儲過程的串行執行。因為使用內存鎖不能絕對保證兩個存儲過程的串行執行,因為應用服務器down掉重啟后可能會出現并發執行的情況,因為先前的存儲過程還在db中運行。我們是使用LTS,對quartz進行了封裝來做任務調度的。我們決定鎖的管理操作由framework來實現。原因是:
l 鎖管理器可以做成通用的模塊
l 申請鎖,釋放鎖是比較危險的操作,擔心業務開發人員由于遺忘導致死鎖或者并發問題
l 可以很好的集成到我們現有的framework中,方便地開放給業務開發人員使用
注意:我們極其不推薦使用悲觀離線鎖,如果沖突出現的概率比較少,可以用其他方法比如樂觀離線鎖,DB Constraint再通過補償操作能解決的問題,請不要使用悲觀離線鎖。
原理
PLSQL UL LOCK是oracle提供出來給開發人員使用的鎖資源,功能和DML鎖是類似的,當然我們可以通過DML鎖來完成并發控制,select…for update或者自己維護一張鎖表,考慮到實現代價,我們打算使用PLSQL UL LOCK。而且Oracle保證了session釋放時,UL lock都會被釋放。
但是用它時,需要注意到它的DBMS_LOCK.Unique函數它每次都會commit數據。如果是在分布式事務當中,會拋出事務已提交的異常。因為我們使用的是XA resource并且transaction level是global的,也就是JTA。為了使得鎖的申請和釋放不影響分布式業務事務,或者我們是使用非xa的resource和local的transaction來完成鎖操作,或者也可以暫停已有事務,等鎖操作完成后resume暫停的分布式事務??紤]到重用已有的xa resource我們打算使用后一種方法,其實這種方法我們也會經常使用,暫停分布式事務做DDL操作,再釋放事務。
實現方法:
l 封裝DBMS_LOCK包中的幾個存儲過程為我們所用
l Java端提供一個基于PLSQL UL鎖的管理器
l Java端定義好申請鎖,業務操作,釋放鎖的使用流程,作為一個模板
DB存儲過程:
對DBMS_LOCK做了簡單的封裝,避免直接調用DBMS_LOCK。這樣做的好處是:
l 和Oracle解耦,如果其他數據庫可以提供類似的功能,我們也可以用同名的存儲過程實現
l 方便以后對存儲過程重構,升級
l 我們需要對DBMS_LOCK進行簡單的封裝,因為DBMS_LOCK.Unique獲取lockhandle oracle中鎖的唯一標識,輸入是lockname,邏輯名,輸出是鎖的唯一標識,對java端應該是透明的,java端應該只關心鎖的邏輯名。
create or replace package body frm_lts_processor_lock_pkg is
/* table to store lockhandles by name */
TYPE handle_tbltype IS TABLE OF varchar2(128)
INDEX BY varchar2(128);
v_lockhandle_tbl handle_tbltype;
procedure frm_lts_lock_acquire(i_lock_name in varchar2, i_expiration_time in Integer default 864000, i_wait_time in Integer default DBMS_LOCK.maxwait, o_result out number) as
v_result number;
v_lockhandle varchar2(128);
begin
if v_lockhandle_tbl.count = 0 then
sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
v_lockhandle_tbl(i_lock_name) := v_lockhandle;
elsif v_lockhandle_tbl.exists(i_lock_name) then
dbms_output.put_line('atttacked');
v_lockhandle := v_lockhandle_tbl(i_lock_name);
else
dbms_output.put_line('new acquire');
--acquire a unique lock id
sys.dbms_lock.allocate_unique(i_lock_name, v_lockhandle, i_expiration_time);
v_lockhandle_tbl(i_lock_name) := v_lockhandle;
end if;
--acquire a lock
v_result := sys.dbms_lock.request(v_lockhandle, dbms_lock.x_mode, i_wait_time, false);
--set return values
o_result := v_result;
end frm_lts_lock_acquire;
function frm_lts_lock_release(i_lock_name in varchar2) return number as
v_result number := 6;
v_lockhandle varchar2(128);
begin
--release lock according to lockhandle
if v_lockhandle_tbl.exists(i_lock_name) then
v_lockhandle := v_lockhandle_tbl(i_lock_name);
v_result := sys.dbms_lock.release(v_lockhandle);
v_lockhandle_tbl.delete(i_lock_name);
end if;
return v_result;
end frm_lts_lock_release;
end frm_lts_processor_lock_pkg;
/
|
鎖管理器:
其實應用項目組有多個這樣的存儲過程,而這些存儲過程之間的串行執行可以有多個business key來決定的,比如job order number,delivery order等。所以我們需要給他們提供多鎖管理機制。我們會對這多個鎖進行排序,以避免死鎖,并強烈推薦應用項目設置超時時間。這些business key是由String對象構成的,為了防止大量的業務操作被鎖在null或者空string這樣沒有意義的business key上面,我們對application提供的鎖集合還需要進行過濾。
原理還是很簡單的,就是在本地事務中調用db端的申請鎖,釋放鎖的存儲過程,然后對返回的結果進行一系列處理。
在使用多鎖機制的時候要保證,如果只申請到了部分鎖,在申請其中另外一個鎖時發生了錯誤或者超時,要能夠安全地將已申請的鎖釋放掉,所以多鎖申請需要記錄已申請到的鎖,并且記錄發生的錯誤,區分timeout和異常。Timeout返回false,如果出現異常記錄下來,最后拋出。釋放多鎖時,不能被中斷,記錄釋放每個鎖后的結果,最后判定如果其中一些鎖釋放時發生了錯誤,拋出。
handleLock定義暫停jta事務,執行鎖操作,釋放jta事務流程
private Object handleLock(Connection connection,
LocalTransactionCallback localTransactionCallback)

throws LockException
{
TransactionManager tm = null;
Transaction currentTx = null;
Object result = null;

try
{
Context initialContext = new InitialContext();
UserTransaction userTrx = (javax.transaction.UserTransaction) initialContext
.lookup("java:comp/UserTransaction");

if (!(userTrx.getStatus() == Status.STATUS_NO_TRANSACTION))
{
tm = TransactionUtils.getTransactionManager(userTrx);

if (tm != null)
{
currentTx = tm.suspend();
}
}
result = localTransactionCallback
.executeInLocalTransaction(connection);


if (null != currentTx)
{
tm.resume(currentTx);
}

} catch (NamingException e)
{

} catch (SystemException e)
{

} catch (InvalidTransactionException e)
{

} catch (IllegalStateException e)
{
}
return result;
}

多鎖申請操作是上面流程的一個回調
private class ObtainMutipleLocksLocalTransactionCallback implements

LocalTransactionCallback
{
private Set<String> lockNames;
private int waitTime;

ObtainMutipleLocksLocalTransactionCallback(Set<String> lockNames,

int waitTime)
{
this.lockNames = lockNames;
this.waitTime = waitTime;
}

public Object executeInLocalTransaction(Connection conn)
{
CallableStatement lockAcquireStmt = null;
Set<String> obtainedLockNames = new HashSet<String>();
boolean timeOut = false;
String timeOutLockName = null;
Exception mifLockException = null;

try
{
lockAcquireStmt = conn.prepareCall(OBTAIN_LOCK_PROC_CALL);

for (String lockName : lockNames)
{
lockAcquireStmt.setString(1, lockName);
lockAcquireStmt.setInt(2, LCOK_EXPIRE_TIME);
lockAcquireStmt.setInt(3, waitTime);
lockAcquireStmt.registerOutParameter(4,
java.sql.Types.INTEGER);
lockAcquireStmt.registerOutParameter(5,
java.sql.Types.VARCHAR);
lockAcquireStmt.executeUpdate();
int lockacquireResult = lockAcquireStmt.getInt(4);
if (lockacquireResult == ULLockResultType.SUCCESSFUL)
obtainedLockNames.add(lockName);

} else if (lockacquireResult == ULLockResultType.TIMEOUT)
{
timeOut = true;
timeOutLockName = lockName;
break;

} else if (lockacquireResult != ULLockResultType.ALREADY_OWNED)
{
String lockResultDesc = ULLockResultType
.getAcquireTypeDesc(lockacquireResult);
LockException lockException = new LockException(
"Obtain lock " + lockName
+ " fails, the reason is "
+ lockResultDesc + " .");
lockException.setLockName(lockName);
lockException.setLockHandlingResult(lockResultDesc);
throw lockException;

} else
{
}
}

} catch (Exception ex)
{
mifLockException = ex;

} finally
{

if (null != lockAcquireStmt)
{

try
{
lockAcquireStmt.close();

} catch (SQLException e)
{
// swallow
}
}
}
boolean success = true;

if (timeOut || mifLockException != null)
{
success = false;
}
return new ObtainMultipleLocksResult(success, obtainedLockNames,
timeOut, timeOutLockName, mifLockException);
}
}

多鎖釋放操作也是事務暫停流程的一個回調
private class ReleaseMultipleLocksLocalTransactionCallback implements

LocalTransactionCallback
{
private Set<String> lockNames;


ReleaseMultipleLocksLocalTransactionCallback(Set<String> lockNames)
{
this.lockNames = lockNames;
}


public Object executeInLocalTransaction(Connection conn)
{
CallableStatement lockReleaseStmt = null;
Map<String, Exception> mifLockErrors = new HashMap<String, Exception>();
Set<String> releasedLocks = new HashSet<String>();

try
{

try
{
lockReleaseStmt = conn.prepareCall(RELEASE_LOCK_PROC_CALL);

} catch (Exception ex)
{

for (String lockName : lockNames)
{
mifLockErrors.put(lockName, ex);
}
return new ReleaseMutipleLocksResult(false, releasedLocks, mifLockErrors);
}


for (String lockName : lockNames)
{

try
{
lockReleaseStmt.registerOutParameter(1,
java.sql.Types.INTEGER);
lockReleaseStmt.setString(2, lockName);
lockReleaseStmt.executeUpdate();
int lockReleaseResult = lockReleaseStmt.getInt(1);

if (lockReleaseResult == ULLockResultType.SUCCESSFUL)
{
releasedLocks.add(lockName);

} else
{
String lockResultDesc = ULLockResultType
.getReleaseTypeDesc(lockReleaseResult);
LockException lockException = new LockException(
"Release lock " + lockName
+ " fails, the reason is "
+ lockResultDesc + " .");
lockException.setLockName(lockName);
lockException.setLockHandlingResult(lockResultDesc);
mifLockErrors.put(lockName, lockException);
}

} catch (Exception ex)
{
mifLockErrors.put(lockName, ex);
}
}

} finally
{

if (null != lockReleaseStmt)
{

try
{
lockReleaseStmt.close();

} catch (SQLException e)
{
}
}
}
boolean success = releasedLocks.size() == this.lockNames.size();
return new ReleaseMutipleLocksResult(success, releasedLocks,
mifLockErrors);
}
}

使用模板:注意鎖的釋放要寫在finally語句塊里面,保證鎖的釋放。
定義好模板,防止Application用戶直接調用鎖管理器或者濫用鎖,忘記釋放鎖。我們決定定義一個模板,做到鎖的申請和釋放對application用戶來說是透明的,把它做成了隱含鎖。
public void execute(JobExecutionContext context)

throws JobExecutionException
{
Map jobDataMap = context
.getJobDetail().getJobDataMap();
Collection<String> lockKeys = (Collection<String>) jobDataMap.get(LOCK_NAME_KEY);
Integer waitTimeInteger = (Integer) jobDataMap
.get(LOCK_WAIT_TIME_SECONDS_KEY);
int waitTime = MAX_WAITTIME;

if (waitTimeInteger != null)
{
waitTime = waitTimeInteger.intValue();
}
Set<String> uniqueLockKeys = new HashSet<String>(lockKeys);

// filter empty keys
Iterator<String> keyIterator = uniqueLockKeys.iterator();

while (keyIterator.hasNext())
{
String key = keyIterator.next();

if (StringUtils.isEmptyNoOffset(key))
{
keyIterator.remove();
}
}

if (CollectionUtils.isNotEmptyCollection(uniqueLockKeys))
{
Set<String> obtainedLockNames = null;
Connection connection = null;

try
{
connection = DataSource.getConnection();
ObtainMultipleLocksResult result = LOCK_MANAGER.obtainLock(
connection, uniqueLockKeys, waitTime);
obtainedLockNames = result.getObtainedLockNames();

if (!result.isSuccess())
{

if (result.isTimeout())
{
//do log
return;

} else
{
JobExecutionException jobException = new JobExecutionException(
"Obtain locks failed! "
+ result.getMifLockException()
.getMessage(), result
.getMifLockException());
throw jobException;
}
}
this. executeInLock (context);

} catch (Throwable e)
{
throw new JobExecutionException(
"Get db connection failed!" + e.getMessage(), e);

} finally
{

if (null != connection)
{
this.releaseLocks(connection, obtainedLockNames);

try
{
connection.close();

} catch (SQLException e)
{
throw new JobExecutionException(
"close db connection failed!" + e.getMessage(), e);
}
}
}

} else
{
this.executeInLock(context);
}
}

executeInLock由application的子類繼承實現
緩存
l 緩存悲觀離線鎖
l 緩存lockhandle
因為使用的是悲觀離線鎖,每次申請鎖都要跑一趟db,但如果當前線程已經是lock的所有者就不需要白跑一趟了??梢杂?/span>ThreadLocal把當前線程已經擁有的鎖緩存起來,釋放鎖時對應的需要清除緩存。
在申請鎖時,需要獲得UL Lock時的lockhandle,釋放鎖時也需要提供鎖的lockhandle,我們需要將它緩存起來,主要是因為DBMS_LOCK.Unique每次都會commit,會影響性能,這樣每次釋放鎖時就可以直接使用lockhandle了。有兩種方法對lockhandle進行緩存,緩存在java端作為實例變量,緩存在plsql包的全局變量中。緩存在java端需要注意的是,lock manager不能作為單例或者享元來使用,否則lock handle的緩存在多jvm之間也存在著并發控制和同步的問題。
源代碼:
Java:
ULLock-sources.rar
PLSQL:
lockplsql.rar
參考:
http://docstore.mik.ua/orelly/oracle/bipack/ch04_01.htm