1.BlockIo狀態遷移
RecordFile是很重要的一個類,幾個重要的變量:
final TransactionManager txnMgr;
private final LinkedList free = new LinkedList();
private final HashMap inUse = new HashMap();
private final HashMap dirty = new HashMap();
private final HashMap inTxn = new HashMap();
private RandomAccessFile file;
free是一個LinkedList,FIFO.其他幾個維護著BlockIo對象的狀態,如果BlockIo對象
從free里面取出來了,那么它的狀態就是inUse了,所以RecordFile類get方法最后
會將BlockIo對象放到inUse的Map中:
BlockIo node;
......
inUse.put(key, node);
node.setClean(); //注意此時node被設置為dirty=false,也就是說BlockIo也有dirty這個指標
return node;
inUse的BlockIo對象如果被修改了,那么它的狀態就變成dirty了。由于從inUse中取出的對象是否發生
了改變RecordFile對象不知道,需要調用者調用一個方法release:
/**
* Releases a block.
*
* @param blockid The record number to release.
* @param isDirty If true, the block was modified since the get().
*/
void release(long blockid, boolean isDirty)
throws IOException {
BlockIo node = (BlockIo) inUse.get(new Long(blockid));
if (node == null)
throw new IOException("bad blockid " + blockid + " on release");
if (!node.isDirty() && isDirty)
node.setDirty();
release(node);
}
/**
* Releases a block.
*
* @param block The block to release.
*/
void release(BlockIo block) {
Long key = new Long(block.getBlockId());
inUse.remove(key);
if (block.isDirty()) {
// System.out.println( "Dirty: " + key + block );
dirty.put(key, block);
} else {
if (!transactionsDisabled && block.isInTransaction()) {
inTxn.put(key, block);
} else {
free.add(block);
}
}
}
release方法輸入兩個參數,其中一個是isDirty,如果取出來的BlockIo對象修改了
就應該將isDirty置為true,否則修改不會被保存。也就是說修改后,BlockIo對象
被放入map dirty中。
release以后,BlockIo對象可能有三個狀態,首先它會從inUse map里面刪除。如果
BlockIo對象被修改,則被放入dirty map中。如果沒有修改,就有可能放入free map中。
做完修改后,可以關閉這個RecordFile對象。
RecordFile file = new RecordFile( testFileName );
byte[] data = file.get( 0 ).getData();
data[ 14 ] = (byte) 'b';
file.release( 0, true );
file.close();
close方法的工作如下:
void close() throws IOException {
if (!dirty.isEmpty()) {
commit();
}
txnMgr.shutdown();
......
file.close(); //此時RandomAccessFile對象close
file = null;
}
可以看到close方法里面主要就是commit這個事務:
for (Iterator i = dirty.values().iterator(); i.hasNext(); ) {
BlockIo node = (BlockIo) i.next();
i.remove();
// System.out.println("node " + node + " map size now " + dirty.size());
if (transactionsDisabled) {
long offset = node.getBlockId() * BLOCK_SIZE;
file.seek(offset);
file.write(node.getData());
node.setClean();
free.add(node);
}
else {
txnMgr.add(node);
inTxn.put(new Long(node.getBlockId()), node);
}
}
如果transaction被disable,那么每一個節點進行更新,如下:
long offset = node.getBlockId() * BLOCK_SIZE;
file.seek(offset);
file.write(node.getData());
node.setClean();
free.add(node);
從最后一行,BlockIo對象node的狀態重新變為free。以上狀態的變化是如下一個循環:
free -> inUse -> dirty -> inTxn -> free
2.BlockIo的獲取
RecordFile對應兩個文件,一個是.db文件,另一個是.lg文件。緩存對象即BlockIo是操作
的最小單元,在get方法中,如果指定的blockid不在inTxn,dirty和free中,那么通過
getNewNode(blockid)得到一個新的BlockIo對象,如果blockid在有效范圍內(這個判斷是通過
計算offset得到的,offset=blockid*BLOCK_SIZE,offset小于.db文件的大小,那說明blockid對應
的數據塊在文件內),那么就從.db文件中讀取去blockid對應的數據庫。如果不在有效范圍內,那么
數據塊就是cleanData的copy,即A block of clean data。
// get a new node and read it from the file
node = getNewNode(blockid);
long offset = blockid * BLOCK_SIZE;
if (file.length() > 0 && offset <= file.length()) {
read(file, offset, node.getData(), BLOCK_SIZE);
} else {
System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
}
inUse.put(key, node);
node.setClean();
return node;
這里getNewNode()方法如下:
private BlockIo getNewNode(long blockid)
throws IOException
{
BlockIo retval = null;
if (!free.isEmpty()) {
retval = (BlockIo) free.removeFirst();
}
if (retval == null)
retval = new BlockIo(0, new byte[BLOCK_SIZE]);
retval.setBlockId(blockid);
retval.setView(null);
return retval;
}
getNewNode()不是直接new BlockIo(),而是從free中取,free中的
BlockIo對象沒有被使用,則直接利用,采取的方式是Least Recently Used策略。BlockIo實現了自定義的Externalizable
序列化:
// implement externalizable interface
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException {
blockId = in.readLong();
int length = in.readInt();
data = new byte[length];
in.readFully(data);
}
// implement externalizable interface
public void writeExternal(ObjectOutput out) throws IOException {
out.writeLong(blockId);
out.writeInt(data.length);
out.write(data);
}
3.TransactionManager的事務處理
序列化用在事務這塊,如果沒有啟動事務,RecordFile直接寫到.db文件中,不會進行序列化操作。
RecordFile演示了事務的操作:
final TransactionManager txnMgr;
......
txnMgr = new TransactionManager(this);
......
if (!transactionsDisabled) {
txnMgr.start();
}
......
if (!transactionsDisabled)
txnMgr.add(node);
......
if (!transactionsDisabled) {
txnMgr.commit();
}
......
txnMgr.shutdown();
以上是一個完整的事務過程,下面對以上過程發生的操作深入闡述:
TransactionManager(this)以RecordFile作為參數進行構造:
TransactionManager(RecordFile owner) throws IOException {
this.owner = owner;
recover();
open();
}
TransactionManager持有RecordFile的引用,然后進行recover和open操作。recover主要是
對log file進行操作,如果有事務沒有執行,那么執行事務將log file中的數據寫到.db文件中
并且對RecordFile進行sync()操作,最后把log file刪除。
private void recover() throws IOException {
String logName = makeLogName();
File logFile = new File(logName);
if (!logFile.exists())
return;
if (logFile.length() == 0) {
logFile.delete();
return;
}
FileInputStream fis = new FileInputStream(logFile);
ObjectInputStream ois = new ObjectInputStream(fis);
try {
if (ois.readShort() != Magic.LOGFILE_HEADER)
throw new Error("Bad magic on log file");
} catch (IOException e) {
// corrupted/empty logfile
logFile.delete();
return;
}
while (true) {
ArrayList blocks = null;
try {
blocks = (ArrayList) ois.readObject();
} catch (ClassNotFoundException e) {
throw new Error("Unexcepted exception: " + e);
} catch (IOException e) {
// corrupted logfile, ignore rest of transactions
break;
}
synchronizeBlocks(blocks.iterator(), false);
// ObjectInputStream must match exactly each
// ObjectOutputStream created during writes
try {
ois = new ObjectInputStream(fis);
} catch (IOException e) {
// corrupted logfile, ignore rest of transactions
break;
}
}
owner.sync();
logFile.delete();
}
open的操作相對簡單很多,只是進行一些初始化賦值工作:
/** Opens the log file */
private void open() throws IOException {
fos = new FileOutputStream(makeLogName());
oos = new ObjectOutputStream(fos);
oos.writeShort(Magic.LOGFILE_HEADER);
oos.flush();
curTxn = -1;
}
下一步就是start這個txnMgr了:
void start() throws IOException {
curTxn++;
if (curTxn == _maxTxns) {
synchronizeLogFromMemory();
curTxn = 0;
}
txns[curTxn] = new ArrayList();
}
start的時候就將當前事務數增加1,如果當前事務數等于設置的最大事務數,就進行sync處理。
sync處理的代碼如下:
/** Synchs in-core transactions to data file and opens a fresh log */
private void synchronizeLogFromMemory() throws IOException {
close();
TreeSet blockList = new TreeSet( new BlockIoComparator() );
int numBlocks = 0;
int writtenBlocks = 0;
for (int i = 0; i < _maxTxns; i++) {
if (txns[i] == null)
continue;
// Add each block to the blockList, replacing the old copy of this
// block if necessary, thus avoiding writing the same block twice
for (Iterator k = txns[i].iterator(); k.hasNext(); ) {
BlockIo block = (BlockIo)k.next();
if ( blockList.contains( block ) ) {
block.decrementTransactionCount();
}
else {
writtenBlocks++;
boolean result = blockList.add( block );
}
numBlocks++;
}
txns[i] = null;
}
// Write the blocks from the blockList to disk
synchronizeBlocks(blockList.iterator(), true);
owner.sync();
open();
}
sync的主要操作就是synchronizeBlocks操作:
private void synchronizeBlocks(Iterator blockIterator, boolean fromCore)
throws IOException
{
// write block vector elements to the data file.
while ( blockIterator.hasNext() ) {
BlockIo cur = (BlockIo)blockIterator.next();
owner.synch(cur);
if (fromCore) {
cur.decrementTransactionCount();
if (!cur.isInTransaction()) {
owner.releaseFromTransaction(cur, true);
}
}
}
}
接下來的操作txnMgr.add(node):
/**
* Indicates the block is part of the transaction.
*/
void add(BlockIo block) throws IOException {
block.incrementTransactionCount();
txns[curTxn].add(block);
}
這個操作很簡單,就是將BlockIo對象放入到當前事務的ArrayList當中。之后commit操作:
/**
* Commits the transaction to the log file.
*/
void commit() throws IOException {
oos.writeObject(txns[curTxn]);
sync();
// set clean flag to indicate blocks have been written to log
setClean(txns[curTxn]);
// open a new ObjectOutputStream in order to store
// newer states of BlockIo
oos = new ObjectOutputStream(fos);
}
最后的操作就是txnMgr.shutdown():
/**
* Shutdowns the transaction manager. Resynchronizes outstanding
* logs.
*/
void shutdown() throws IOException {
synchronizeLogFromMemory();
close();
}