在springside3.*中的showcase案例中,有一個把log4j的日志存入數據庫的演示,下面是我對這個案例的學習筆記。
1、我們首先來看下log4j相關日志的配置:
#Async Database Appender (Store business message)
log4j.appender.DB=org.springside.examples.showcase.log.appender.QueueAppender
log4j.appender.DB.QueueName=dblog

#Demo level with Async Database appender
log4j.logger.DBLogExample=INFO,Console,DB
log4j.additivity.DBLogExample=false
其中org.springside.examples.showcase.log.appender.QueueAppender就是對ssLog4j日志的一個擴展,而日志的event(里面是日志的內容)是存放在一個BlockingQueue中,當有多個日志需要分別存入不同的地方時,就根據QueryName來區分。
2、接下來看一下org.springside.examples.showcase.log.appender.QueueAppender里面的內容:

/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id: QueueAppender.java 1189 2010-09-01 17:24:12Z calvinxiu $
*/
package org.springside.examples.showcase.log.appender;

import java.util.concurrent.BlockingQueue;

import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import org.springside.examples.showcase.queue.QueuesHolder;


/** *//**
* 輕量級的Log4j異步Appender.
*
* 將所有消息放入QueueManager所管理的Blocking Queue中.
*
* @see QueuesHolder
*
* @author calvin
*/

public class QueueAppender extends org.apache.log4j.AppenderSkeleton
{

protected String queueName;

protected BlockingQueue<LoggingEvent> queue;


/** *//**
* AppenderSkeleton回調函數, 事件到達時將時間放入Queue.
*/
@Override

public void append(LoggingEvent event)
{

if (queue == null)
{
queue = QueuesHolder.getQueue(queueName);
}

boolean sucess = queue.offer(event);


if (sucess)
{
LogLog.debug("put event to queue success:" + new LoggingEventWrapper(event).convertToString());


} else
{
LogLog.error("Put event to queue fail:" + new LoggingEventWrapper(event).convertToString());
}
}


/** *//**
* AppenderSkeleton回調函數,關閉Logger時的清理動作.
*/

public void close()
{
}


/** *//**
* AppenderSkeleton回調函數, 設置是否需要定義Layout.
*/

public boolean requiresLayout()
{
return false;
}


/** *//**
* Log4j根據getter/setter從log4j.properties中注入同名參數.
*/

public String getQueueName()
{
return queueName;
}


/** *//**
* @see #getQueueName()
*/

public void setQueueName(String queueName)
{
this.queueName = queueName;
}
}

這是對Log4j擴展的標準做法,繼承abstract class AppenderSkeleton,實現它的abstract protected void append(LoggingEvent event) 方法。
而這個方法的實現很簡單,就是根據queueName從queuesHolder中取出一個BlockingQueue<LoggingEvent>,然后把LoggerEvent塞到這個BlockingQueue中去,關于queuesHolder,下面會講到。到這為止,log4j的活就完成了,下面的都是concurrent的事了。
3、讓我們轉到spring的配置文件中,看看springside是如何接收下面的工作,下面是applicationContext-log.xml的片段:
<!-- 消息Queue管理器-->
<bean class="org.springside.examples.showcase.queue.QueuesHolder">
<property name="queueSize" value="1000" />
</bean>

<!-- 讀出Queue中日志消息寫入數據庫的任務 -->
<bean id="jdbcLogWriter" class="org.springside.examples.showcase.log.appender.JdbcLogWriter">
<property name="queueName" value="dblog" />
<property name="batchSize" value="10" />
<property name="sql">
<value>
insert into SS_LOG(THREAD_NAME,LOGGER_NAME,LOG_TIME,LEVEL,MESSAGE)
values(:thread_name,:logger_name,:log_time,:level,:message)
</value>
</property>
</bean>
我們先從簡單的下手,先看QueuesHolder:
private static ConcurrentMap<String, BlockingQueue> queueMap = new MapMaker().concurrencyLevel(32).makeMap();//消息隊列


/** *//**
* 根據queueName獲得消息隊列的靜態函數.
* 如消息隊列還不存在, 會自動進行創建.
*/

public static <T> BlockingQueue<T> getQueue(String queueName)
{
BlockingQueue queue = queueMap.get(queueName);


if (queue == null)
{
BlockingQueue newQueue = new LinkedBlockingQueue(queueSize);

//如果之前消息隊列還不存在,放入新隊列并返回Null.否則返回之前的值.
queue = queueMap.putIfAbsent(queueName, newQueue);

if (queue == null)
{
queue = newQueue;
}
}
return queue;
}
其實這個類很簡單,就是一個map,key就是上面log4j配置文件中的queueName,value就是一個BlockingQueue,這樣就可以存放多個日志queue,做不同的處理。
4、下面這個是重頭戲,先把JdbcLogWriter的代碼全貼出來:

/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id: JdbcAppenderTask.java 353 2009-08-22 09:33:28Z calvinxiu
*/
package org.springside.examples.showcase.log.appender;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;
import javax.sql.DataSource;

import org.apache.log4j.spi.LoggingEvent;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springside.examples.showcase.queue.BlockingConsumer;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;


/** *//**
* 將Queue中的log4j event寫入數據庫的消費者任務.
*
* 即時阻塞的讀取Queue中的事件,達到緩存上限后使用Jdbc批量寫入模式.
* 如需換為定時讀取模式,繼承于PeriodConsumer稍加改造即可.
*
* @see BlockingConsumer
*
* @author calvin
*/

public class JdbcLogWriter extends BlockingConsumer
{

protected String sql;
protected int batchSize = 10;

protected List<LoggingEvent> eventsBuffer = Lists.newArrayList();
protected SimpleJdbcTemplate jdbcTemplate;
protected TransactionTemplate transactionTemplate;


/** *//**
* 帶Named Parameter的insert sql.
*
* Named Parameter的名稱見AppenderUtils中的常量定義.
*/

public void setSql(String sql)
{
this.sql = sql;
}


/** *//**
* 批量讀取事件數量, 默認為10.
*/

public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}


/** *//**
* 根據注入的DataSource創建jdbcTemplate.
*/
@Resource

public void setDataSource(DataSource dataSource)
{
jdbcTemplate = new SimpleJdbcTemplate(dataSource);
}


/** *//**
* 根據注入的PlatformTransactionManager創建transactionTemplate.
*/
@Resource

public void setDefaultTransactionManager(PlatformTransactionManager defaultTransactionManager)
{
transactionTemplate = new TransactionTemplate(defaultTransactionManager);
}


/** *//**
* 消息處理函數,將消息放入buffer,當buffer達到batchSize時執行批量更新函數.
*/
@Override

protected void processMessage(Object message)
{
LoggingEvent event = (LoggingEvent) message;
eventsBuffer.add(event);


if (logger.isDebugEnabled())
{
logger.debug("get event: {}", new LoggingEventWrapper(event).convertToString());
}

//已到達BufferSize則執行批量插入操作

if (eventsBuffer.size() >= batchSize)
{
updateBatch();
}
}


/** *//**
* 將Buffer中的事件列表批量插入數據庫.
*/
@SuppressWarnings("unchecked")

public void updateBatch()
{

try
{
//分析事件列表, 轉換為jdbc批處理參數.
int i = 0;
Map[] paramMapArray = new HashMap[eventsBuffer.size()];

for (LoggingEvent event : eventsBuffer)
{
paramMapArray[i++] = parseEvent(event);
}
final SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(paramMapArray);

//執行批量插入,如果失敗調用失敗處理函數.

transactionTemplate.execute(new TransactionCallbackWithoutResult()
{
@Override

protected void doInTransactionWithoutResult(TransactionStatus status)
{

try
{
jdbcTemplate.batchUpdate(getActualSql(), batchParams);

if (logger.isDebugEnabled())
{

for (LoggingEvent event : eventsBuffer)
{
logger.debug("saved event: {}", new LoggingEventWrapper(event).convertToString());
}
}

} catch (DataAccessException e)
{
status.setRollbackOnly();
handleDataAccessException(e, eventsBuffer);
}
}
});

//清除已完成的Buffer
eventsBuffer.clear();

} catch (Exception e)
{
logger.error("批量提交任務時發生錯誤.", e);
}
}


/** *//**
* 退出清理函數,完成buffer中未完成的消息.
*/
@Override

protected void clean()
{

if (!eventsBuffer.isEmpty())
{
updateBatch();
}
logger.debug("cleaned task {}", this);
}


/** *//**
* 分析Event, 建立Parameter Map, 用于綁定sql中的Named Parameter.
*/

protected Map<String, Object> parseEvent(LoggingEvent event)
{
Map<String, Object> parameterMap = Maps.newHashMap();
LoggingEventWrapper eventWrapper = new LoggingEventWrapper(event);

parameterMap.put("thread_name", eventWrapper.getThreadName());
parameterMap.put("logger_name", eventWrapper.getLoggerName());
parameterMap.put("log_time", eventWrapper.getDate());
parameterMap.put("level", eventWrapper.getLevel());
parameterMap.put("message", eventWrapper.getMessage());
return parameterMap;
}


/** *//**
* 可被子類重載的數據訪問錯誤處理函數,如將出錯的事件持久化到文件.
*/

protected void handleDataAccessException(DataAccessException e, List<LoggingEvent> errorEventBatch)
{

if (e instanceof DataAccessResourceFailureException)
{
logger.error("database connection error", e);

} else
{
logger.error("other database error", e);
}


for (LoggingEvent event : errorEventBatch)
{
logger.error("event insert to database error, ignore it: "
+ new LoggingEventWrapper(event).convertToString(), e);
}
}


/** *//**
* 可被子類重載的sql提供函數,可對sql語句進行特殊處理,如日志表的表名可帶日期后綴 LOG_2009_02_31.
*/

protected String getActualSql()
{
return sql;
}
}

這個類的作用有
1)當沒有處理的日志在1000以內時,不停執行日志的處理(設置在QueuesHolder中),超過1000,就報錯(見QueueAppender的append方法).
2)每次都把一條日志放到buffer中,達到10條時開始批量的把日志入數據庫,條數和入庫的sql都寫在上面的spring配置文件中。
可以看到,主要的方法就是processMessage。那么,這個processMessage方法是在哪里被調用的呢?
在上面的JdbcLogWriter類的代碼中可以看到,它繼承自BlockingConsumer,我們看看BlockingConsumer里面有些什么:

/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id$
*/
package org.springside.examples.showcase.queue;


/** *//**
* 采用即時阻塞讀取Queue中消息策略的Consumer.
*/

public abstract class BlockingConsumer extends QueueConsumer
{


/** *//**
* 線程執行函數,阻塞獲取消息并調用processMessage()進行處理.
*/

public void run()
{
//循環阻塞獲取消息直到線程被中斷.

try
{

while (!Thread.currentThread().isInterrupted())
{
Object message = queue.take();
processMessage(message);
}

} catch (InterruptedException e)
{
// Ignore.

} finally
{
//退出線程前調用清理函數.
clean();
}
}


/** *//**
* 消息處理函數.
*/
protected abstract void processMessage(Object message);


/** *//**
* 退出清理函數.
*/
protected abstract void clean();
}
很明顯,BlockingConsumer肯定是繼承自Thread類或者實現于Runnable接口的線程類,在線程啟動的時候processMessage方法被調用。當我們需要別的需要處理日志內容時,就可以繼承BlockingConsumer寫自己的processMessage來處理日志了。
5、下面,讓我們看看這個線程類是怎么啟動的吧。看一下BlockingConsumer就知道,它其實還繼承于另外一個類QueueConsumer:

/** *//**
* Copyright (c) 2005-2009 springside.org.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
*
* $Id$
*/
package org.springside.examples.showcase.queue;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springside.modules.utils.ThreadUtils;
import org.springside.modules.utils.ThreadUtils.CustomizableThreadFactory;


/** *//**
* 單線程消費Queue中消息的任務基類.
*
* 定義了QueueConsumer的啟動關閉流程.
*
* TODO:支持多線程執行.
*
* @see QueuesHolder
*
* @author calvin
*/
@SuppressWarnings("unchecked")

public abstract class QueueConsumer implements Runnable
{

protected Logger logger = LoggerFactory.getLogger(getClass());

protected String queueName;
protected int shutdownTimeout = Integer.MAX_VALUE;

protected boolean persistence = true;
protected String persistencePath = System.getProperty("java.io.tmpdir") + File.separator + "queue";
protected Object persistenceLock = new Object(); //用于在backup與restore間等待的鎖.

protected BlockingQueue queue;
protected ExecutorService executor;


/** *//**
* 任務所消費的隊列名稱.
*/

public void setQueueName(String queueName)
{
this.queueName = queueName;
}


/** *//**
* 停止任務時最多等待的時間, 單位為毫秒.
*/

public void setShutdownTimeout(int shutdownTimeout)
{
this.shutdownTimeout = shutdownTimeout;
}


/** *//**
* 在JVM關閉時是否需要持久化未完成的消息到文件.
*/

public void setPersistence(boolean persistence)
{
this.persistence = persistence;
}


/** *//**
* 系統關閉時將隊列中未處理的消息持久化到文件的目錄,默認為系統臨時文件夾下的queue目錄.
*/

public void setPersistencePath(String persistencePath)
{
this.persistencePath = persistencePath;
}


/** *//**
* 任務初始化函數.
*/
@PostConstruct

public void start() throws IOException, ClassNotFoundException, InterruptedException
{

queue = QueuesHolder.getQueue(queueName);


if (persistence)
{

synchronized (persistenceLock)
{
restoreQueue();
}
}

executor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("Queue Consumer-" + queueName));
executor.execute(this);
}


/** *//**
* 任務結束函數.
*/
@PreDestroy

public void stop() throws IOException
{

try
{
ThreadUtils.normalShutdown(executor, shutdownTimeout, TimeUnit.MILLISECONDS);

} finally
{

if (persistence)
{

synchronized (persistenceLock)
{
backupQueue();
}
}
}

}


/** *//**
* 保存隊列中的消息到文件.
*/

protected void backupQueue() throws IOException
{
List list = new ArrayList();
queue.drainTo(list);


if (!list.isEmpty())
{
ObjectOutputStream oos = null;

try
{
File file = new File(getPersistenceDir(), getPersistenceFileName());
oos = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));

for (Object message : list)
{
oos.writeObject(message);
}

logger.info("隊列{}已持久化{}個消息到{}", new Object[]
{ queueName, list.size(), file.getAbsolutePath() });

} finally
{

if (oos != null)
{
oos.close();
}
}

} else
{
logger.debug("隊列{}為空,不需要持久化 .", queueName);
}
}


/** *//**
* 載入持久化文件中的消息到隊列.
*/

protected void restoreQueue() throws ClassNotFoundException, IOException, InterruptedException
{
ObjectInputStream ois = null;
File file = new File(getPersistenceDir(), getPersistenceFileName());


if (file.exists())
{

try
{
ois = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
int count = 0;

while (true)
{

try
{
Object message = ois.readObject();
queue.put(message);
count++;

} catch (EOFException e)
{
break;
}
}

logger.info("隊列{}已從{}中恢復{}個消息.", new Object[]
{ queueName, file.getAbsolutePath(), count });

} finally
{

if (ois != null)
{
ois.close();
}
}
file.delete();

} else
{
logger.debug("隊列{}的持久化文件{}不存在", queueName, file.getAbsolutePath());
}
}


/** *//**
* 獲取持久化文件路徑.
* 持久化文件默認路徑為java.io.tmpdir/queue/隊列名.
* 如果java.io.tmpdir/queue/目錄不存在,會進行創建.
*/

protected File getPersistenceDir()
{
File parentDir = new File(persistencePath + File.separator);

if (!parentDir.exists())
{
parentDir.mkdirs();
}
return parentDir;
}


/** *//**
* 獲取持久化文件的名稱,默認為queueName,可重載.
*/

protected String getPersistenceFileName()
{
return queueName;
}
}

這里終于可以確信JdbcLogWriter是一個實現了Runnable的線程類了。我們先略過那些保存日志到文件的方法,關注它的啟動方法start()。在start方法中,用到了concurrent包的Executors來執行線程任務。所以整個的過程是這樣的:
1、spring隨應用啟動,創建QueuesHolder靜態類用于存放多種queueName的日志queue;創建JdbcLogWriter開始啟動線程,不停循環處理日志。
2、log4j隨應用啟動,并產生日志,把日志存到queue中(使用offer方法)。
3、JdbcLogWriter不停的把日志從queue中移出來(使用take方法)。
3、每當有10條日志生成,JdbcLogWriter的updateBatch方法就把日志批量入庫,這個工作在processMesage方法里面。
這就是springside日志入庫的整個過程了,茲以為記。
4、
我的微博
http://t.sina.com.cn/1401900445