<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    月掛夜中央

    懶惰程序員

    常用鏈接

    統計

    最新評論

    springside3.*中log4j和java.util.concurrent的結合使用

            在springside3.*中的showcase案例中,有一個把log4j的日志存入數據庫的演示,下面是我對這個案例的學習筆記。
    1、我們首先來看下log4j相關日志的配置:
    #Async Database Appender (Store business message)
    log4j.appender.DB
    =org.springside.examples.showcase.log.appender.QueueAppender
    log4j.appender.DB.QueueName
    =dblog

    #Demo level with Async Database appender 
    log4j.logger.DBLogExample
    =INFO,Console,DB
    log4j.additivity.DBLogExample
    =false

    其中org.springside.examples.showcase.log.appender.QueueAppender就是對ssLog4j日志的一個擴展,而日志的event(里面是日志的內容)是存放在一個BlockingQueue中,當有多個日志需要分別存入不同的地方時,就根據QueryName來區分。
    2、接下來看一下org.springside.examples.showcase.log.appender.QueueAppender里面的內容:
    /**
     * Copyright (c) 2005-2009 springside.org.cn
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * 
     * $Id: QueueAppender.java 1189 2010-09-01 17:24:12Z calvinxiu $
     
    */

    package org.springside.examples.showcase.log.appender;

    import java.util.concurrent.BlockingQueue;

    import org.apache.log4j.helpers.LogLog;
    import org.apache.log4j.spi.LoggingEvent;
    import org.springside.examples.showcase.queue.QueuesHolder;

    /**
     * 輕量級的Log4j異步Appender.
     * 
     * 將所有消息放入QueueManager所管理的Blocking Queue中.
     * 
     * 
    @see QueuesHolder
     * 
     * 
    @author calvin
     
    */

    public class QueueAppender extends org.apache.log4j.AppenderSkeleton {

        
    protected String queueName;

        
    protected BlockingQueue<LoggingEvent> queue;

        
    /**
         * AppenderSkeleton回調函數, 事件到達時將時間放入Queue.
         
    */

        @Override
        
    public void append(LoggingEvent event) {
            
    if (queue == null{
                queue 
    = QueuesHolder.getQueue(queueName);
            }


            
    boolean sucess = queue.offer(event);

            
    if (sucess) {
                LogLog.debug(
    "put event to queue success:" + new LoggingEventWrapper(event).convertToString());

            }
     else {
                LogLog.error(
    "Put event to queue fail:" + new LoggingEventWrapper(event).convertToString());
            }

        }


        
    /**
         * AppenderSkeleton回調函數,關閉Logger時的清理動作.
         
    */

        
    public void close() {
        }


        
    /**
         * AppenderSkeleton回調函數, 設置是否需要定義Layout.
         
    */

        
    public boolean requiresLayout() {
            
    return false;
        }


        
    /**
         * Log4j根據getter/setter從log4j.properties中注入同名參數.
         
    */

        
    public String getQueueName() {
            
    return queueName;
        }


        
    /**
         * 
    @see #getQueueName()
         
    */

        
    public void setQueueName(String queueName) {
            
    this.queueName = queueName;
        }

    }

    這是對Log4j擴展的標準做法,繼承abstract class AppenderSkeleton,實現它的abstract  protected   void append(LoggingEvent event) 方法。
    而這個方法的實現很簡單,就是根據queueName從queuesHolder中取出一個BlockingQueue<LoggingEvent>,然后把LoggerEvent塞到這個BlockingQueue中去,關于queuesHolder,下面會講到。到這為止,log4j的活就完成了,下面的都是concurrent的事了。
    3、讓我們轉到spring的配置文件中,看看springside是如何接收下面的工作,下面是applicationContext-log.xml的片段:
        <!-- 消息Queue管理器-->
        
    <bean class="org.springside.examples.showcase.queue.QueuesHolder">
            
    <property name="queueSize" value="1000" />
        
    </bean>

        
    <!-- 讀出Queue中日志消息寫入數據庫的任務 -->
        
    <bean id="jdbcLogWriter" class="org.springside.examples.showcase.log.appender.JdbcLogWriter">
            
    <property name="queueName" value="dblog" />
            
    <property name="batchSize" value="10" />
            
    <property name="sql">
                
    <value>
                    insert into SS_LOG(THREAD_NAME,LOGGER_NAME,LOG_TIME,LEVEL,MESSAGE)
                    values(:thread_name,:logger_name,:log_time,:level,:message)
                
    </value>
            
    </property>
        
    </bean>

    我們先從簡單的下手,先看QueuesHolder:
    private static ConcurrentMap<String, BlockingQueue> queueMap = new MapMaker().concurrencyLevel(32).makeMap();//消息隊列

    /**
         * 根據queueName獲得消息隊列的靜態函數.
         * 如消息隊列還不存在, 會自動進行創建.
         
    */

        
    public static <T> BlockingQueue<T> getQueue(String queueName) {
            BlockingQueue queue 
    = queueMap.get(queueName);

            
    if (queue == null{
                BlockingQueue newQueue 
    = new LinkedBlockingQueue(queueSize);

                
    //如果之前消息隊列還不存在,放入新隊列并返回Null.否則返回之前的值.
                queue = queueMap.putIfAbsent(queueName, newQueue);
                
    if (queue == null{
                    queue 
    = newQueue;
                }

            }

            
    return queue;
        }
    其實這個類很簡單,就是一個map,key就是上面log4j配置文件中的queueName,value就是一個BlockingQueue,這樣就可以存放多個日志queue,做不同的處理。
    4、下面這個是重頭戲,先把JdbcLogWriter的代碼全貼出來:
    /**
     * Copyright (c) 2005-2009 springside.org.cn
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * 
     * $Id: JdbcAppenderTask.java 353 2009-08-22 09:33:28Z calvinxiu
     
    */

    package org.springside.examples.showcase.log.appender;

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import javax.annotation.Resource;
    import javax.sql.DataSource;

    import org.apache.log4j.spi.LoggingEvent;
    import org.springframework.dao.DataAccessException;
    import org.springframework.dao.DataAccessResourceFailureException;
    import org.springframework.jdbc.core.namedparam.SqlParameterSource;
    import org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
    import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.TransactionStatus;
    import org.springframework.transaction.support.TransactionCallbackWithoutResult;
    import org.springframework.transaction.support.TransactionTemplate;
    import org.springside.examples.showcase.queue.BlockingConsumer;

    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;

    /**
     * 將Queue中的log4j event寫入數據庫的消費者任務.
     * 
     * 即時阻塞的讀取Queue中的事件,達到緩存上限后使用Jdbc批量寫入模式.
     * 如需換為定時讀取模式,繼承于PeriodConsumer稍加改造即可.
     * 
     * 
    @see BlockingConsumer
     * 
     * 
    @author calvin
     
    */

    public class JdbcLogWriter extends BlockingConsumer {

        
    protected String sql;
        
    protected int batchSize = 10;

        
    protected List<LoggingEvent> eventsBuffer = Lists.newArrayList();
        
    protected SimpleJdbcTemplate jdbcTemplate;
        
    protected TransactionTemplate transactionTemplate;

        
    /**
         * 帶Named Parameter的insert sql.
         * 
         * Named Parameter的名稱見AppenderUtils中的常量定義.
         
    */

        
    public void setSql(String sql) {
            
    this.sql = sql;
        }


        
    /**
         * 批量讀取事件數量, 默認為10.
         
    */

        
    public void setBatchSize(int batchSize) {
            
    this.batchSize = batchSize;
        }


        
    /**
         * 根據注入的DataSource創建jdbcTemplate.
         
    */

        @Resource
        
    public void setDataSource(DataSource dataSource) {
            jdbcTemplate 
    = new SimpleJdbcTemplate(dataSource);
        }


        
    /**
         * 根據注入的PlatformTransactionManager創建transactionTemplate.
         
    */

        @Resource
        
    public void setDefaultTransactionManager(PlatformTransactionManager defaultTransactionManager) {
            transactionTemplate 
    = new TransactionTemplate(defaultTransactionManager);
        }


        
    /**
         * 消息處理函數,將消息放入buffer,當buffer達到batchSize時執行批量更新函數.
         
    */

        @Override
        
    protected void processMessage(Object message) {
            LoggingEvent event 
    = (LoggingEvent) message;
            eventsBuffer.add(event);

            
    if (logger.isDebugEnabled()) {
                logger.debug(
    "get event: {}"new LoggingEventWrapper(event).convertToString());
            }


            
    //已到達BufferSize則執行批量插入操作
            if (eventsBuffer.size() >= batchSize) {
                updateBatch();
            }

        }


        
    /**
         * 將Buffer中的事件列表批量插入數據庫.
         
    */

        @SuppressWarnings(
    "unchecked")
        
    public void updateBatch() {
            
    try {
                
    //分析事件列表, 轉換為jdbc批處理參數.
                int i = 0;
                Map[] paramMapArray 
    = new HashMap[eventsBuffer.size()];
                
    for (LoggingEvent event : eventsBuffer) {
                    paramMapArray[i
    ++= parseEvent(event);
                }

                
    final SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(paramMapArray);

                
    //執行批量插入,如果失敗調用失敗處理函數.
                transactionTemplate.execute(new TransactionCallbackWithoutResult() {
                    @Override
                    
    protected void doInTransactionWithoutResult(TransactionStatus status) {
                        
    try {
                            jdbcTemplate.batchUpdate(getActualSql(), batchParams);
                            
    if (logger.isDebugEnabled()) {
                                
    for (LoggingEvent event : eventsBuffer) {
                                    logger.debug(
    "saved event: {}"new LoggingEventWrapper(event).convertToString());
                                }

                            }

                        }
     catch (DataAccessException e) {
                            status.setRollbackOnly();
                            handleDataAccessException(e, eventsBuffer);
                        }

                    }

                }
    );

                
    //清除已完成的Buffer
                eventsBuffer.clear();
            }
     catch (Exception e) {
                logger.error(
    "批量提交任務時發生錯誤.", e);
            }

        }


        
    /**
         * 退出清理函數,完成buffer中未完成的消息.
         
    */

        @Override
        
    protected void clean() {
            
    if (!eventsBuffer.isEmpty()) {
                updateBatch();
            }

            logger.debug(
    "cleaned task {}"this);
        }


        
    /**
         * 分析Event, 建立Parameter Map, 用于綁定sql中的Named Parameter.
         
    */

        
    protected Map<String, Object> parseEvent(LoggingEvent event) {
            Map
    <String, Object> parameterMap = Maps.newHashMap();
            LoggingEventWrapper eventWrapper 
    = new LoggingEventWrapper(event);

            parameterMap.put(
    "thread_name", eventWrapper.getThreadName());
            parameterMap.put(
    "logger_name", eventWrapper.getLoggerName());
            parameterMap.put(
    "log_time", eventWrapper.getDate());
            parameterMap.put(
    "level", eventWrapper.getLevel());
            parameterMap.put(
    "message", eventWrapper.getMessage());
            
    return parameterMap;
        }


        
    /**
         * 可被子類重載的數據訪問錯誤處理函數,如將出錯的事件持久化到文件.
         
    */

        
    protected void handleDataAccessException(DataAccessException e, List<LoggingEvent> errorEventBatch) {
            
    if (e instanceof DataAccessResourceFailureException) {
                logger.error(
    "database connection error", e);
            }
     else {
                logger.error(
    "other database error", e);
            }


            
    for (LoggingEvent event : errorEventBatch) {
                logger.error(
    "event insert to database error, ignore it: "
                        
    + new LoggingEventWrapper(event).convertToString(), e);
            }

        }


        
    /**
         * 可被子類重載的sql提供函數,可對sql語句進行特殊處理,如日志表的表名可帶日期后綴 LOG_2009_02_31.
         
    */

        
    protected String getActualSql() {
            
    return sql;
        }

    }

    這個類的作用有
    1)當沒有處理的日志在1000以內時,不停執行日志的處理(設置在QueuesHolder中),超過1000,就報錯(見QueueAppender的append方法).
    2)每次都把一條日志放到buffer中,達到10條時開始批量的把日志入數據庫,條數和入庫的sql都寫在上面的spring配置文件中。
    可以看到,主要的方法就是processMessage。那么,這個processMessage方法是在哪里被調用的呢?
    在上面的JdbcLogWriter類的代碼中可以看到,它繼承自BlockingConsumer,我們看看BlockingConsumer里面有些什么:
    /**
     * Copyright (c) 2005-2009 springside.org.cn
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * 
     * $Id$
     
    */

    package org.springside.examples.showcase.queue;

    /**
     * 采用即時阻塞讀取Queue中消息策略的Consumer.
     
    */

    public abstract class BlockingConsumer extends QueueConsumer {

        
    /**
         * 線程執行函數,阻塞獲取消息并調用processMessage()進行處理.
         
    */

        
    public void run() {
            
    //循環阻塞獲取消息直到線程被中斷.
            try {
                
    while (!Thread.currentThread().isInterrupted()) {
                    Object message 
    = queue.take();
                    processMessage(message);
                }

            }
     catch (InterruptedException e) {
                
    // Ignore.
            }
     finally {
                
    //退出線程前調用清理函數.
                clean();
            }

        }


        
    /**
         * 消息處理函數.
         
    */

        
    protected abstract void processMessage(Object message);

        
    /**
         * 退出清理函數.
         
    */

        
    protected abstract void clean();
    }
    很明顯,BlockingConsumer肯定是繼承自Thread類或者實現于Runnable接口的線程類,在線程啟動的時候processMessage方法被調用。當我們需要別的需要處理日志內容時,就可以繼承BlockingConsumer寫自己的processMessage來處理日志了。
    5、下面,讓我們看看這個線程類是怎么啟動的吧。看一下BlockingConsumer就知道,它其實還繼承于另外一個類QueueConsumer:
    /**
     * Copyright (c) 2005-2009 springside.org.cn
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * 
     * $Id$
     
    */

    package org.springside.examples.showcase.queue;

    import java.io.BufferedInputStream;
    import java.io.BufferedOutputStream;
    import java.io.EOFException;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;

    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springside.modules.utils.ThreadUtils;
    import org.springside.modules.utils.ThreadUtils.CustomizableThreadFactory;

    /**
     * 單線程消費Queue中消息的任務基類.
     * 
     * 定義了QueueConsumer的啟動關閉流程.
     * 
     * TODO:支持多線程執行.
     * 
     * 
    @see QueuesHolder
     * 
     * 
    @author calvin
     
    */

    @SuppressWarnings(
    "unchecked")
    public abstract class QueueConsumer implements Runnable {

        
    protected Logger logger = LoggerFactory.getLogger(getClass());

        
    protected String queueName;
        
    protected int shutdownTimeout = Integer.MAX_VALUE;

        
    protected boolean persistence = true;
        
    protected String persistencePath = System.getProperty("java.io.tmpdir"+ File.separator + "queue";
        
    protected Object persistenceLock = new Object(); //用于在backup與restore間等待的鎖.

        
    protected BlockingQueue queue;
        
    protected ExecutorService executor;

        
    /**
         * 任務所消費的隊列名稱.
         
    */

        
    public void setQueueName(String queueName) {
            
    this.queueName = queueName;
        }


        
    /**
         * 停止任務時最多等待的時間, 單位為毫秒.
         
    */

        
    public void setShutdownTimeout(int shutdownTimeout) {
            
    this.shutdownTimeout = shutdownTimeout;
        }


        
    /**
         * 在JVM關閉時是否需要持久化未完成的消息到文件.
         
    */

        
    public void setPersistence(boolean persistence) {
            
    this.persistence = persistence;
        }


        
    /**
         * 系統關閉時將隊列中未處理的消息持久化到文件的目錄,默認為系統臨時文件夾下的queue目錄.
         
    */

        
    public void setPersistencePath(String persistencePath) {
            
    this.persistencePath = persistencePath;
        }


        
    /**
         * 任務初始化函數.
         
    */

        @PostConstruct
        
    public void start() throws IOException, ClassNotFoundException, InterruptedException {

            queue 
    = QueuesHolder.getQueue(queueName);

            
    if (persistence) {
                
    synchronized (persistenceLock) {
                    restoreQueue();
                }

            }


            executor 
    = Executors.newSingleThreadExecutor(new CustomizableThreadFactory("Queue Consumer-" + queueName));
            executor.execute(
    this);
        }


        
    /**
         * 任務結束函數.
         
    */

        @PreDestroy
        
    public void stop() throws IOException {
            
    try {
                ThreadUtils.normalShutdown(executor, shutdownTimeout, TimeUnit.MILLISECONDS);
            }
     finally {
                
    if (persistence) {
                    
    synchronized (persistenceLock) {
                        backupQueue();
                    }

                }

            }


        }


        
    /**
         * 保存隊列中的消息到文件.
         
    */

        
    protected void backupQueue() throws IOException {
            List list 
    = new ArrayList();
            queue.drainTo(list);

            
    if (!list.isEmpty()) {
                ObjectOutputStream oos 
    = null;
                
    try {
                    File file 
    = new File(getPersistenceDir(), getPersistenceFileName());
                    oos 
    = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
                    
    for (Object message : list) {
                        oos.writeObject(message);
                    }

                    logger.info(
    "隊列{}已持久化{}個消息到{}"new Object[] { queueName, list.size(), file.getAbsolutePath() });
                }
     finally {
                    
    if (oos != null{
                        oos.close();
                    }

                }

            }
     else {
                logger.debug(
    "隊列{}為空,不需要持久化 .", queueName);
            }

        }


        
    /**
         * 載入持久化文件中的消息到隊列.
         
    */

        
    protected void restoreQueue() throws ClassNotFoundException, IOException, InterruptedException {
            ObjectInputStream ois 
    = null;
            File file 
    = new File(getPersistenceDir(), getPersistenceFileName());

            
    if (file.exists()) {
                
    try {
                    ois 
    = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
                    
    int count = 0;
                    
    while (true{
                        
    try {
                            Object message 
    = ois.readObject();
                            queue.put(message);
                            count
    ++;
                        }
     catch (EOFException e) {
                            
    break;
                        }

                    }

                    logger.info(
    "隊列{}已從{}中恢復{}個消息."new Object[] { queueName, file.getAbsolutePath(), count });
                }
     finally {
                    
    if (ois != null{
                        ois.close();
                    }

                }

                file.delete();
            }
     else {
                logger.debug(
    "隊列{}的持久化文件{}不存在", queueName, file.getAbsolutePath());
            }

        }


        
    /**
         * 獲取持久化文件路徑.
         * 持久化文件默認路徑為java.io.tmpdir/queue/隊列名.
         * 如果java.io.tmpdir/queue/目錄不存在,會進行創建.
         
    */

        
    protected File getPersistenceDir() {
            File parentDir 
    = new File(persistencePath + File.separator);
            
    if (!parentDir.exists()) {
                parentDir.mkdirs();
            }

            
    return parentDir;
        }


        
    /**
         * 獲取持久化文件的名稱,默認為queueName,可重載.
         
    */

        
    protected String getPersistenceFileName() {
            
    return queueName;
        }

    }

    這里終于可以確信JdbcLogWriter是一個實現了Runnable的線程類了。我們先略過那些保存日志到文件的方法,關注它的啟動方法start()。在start方法中,用到了concurrent包的Executors來執行線程任務。所以整個的過程是這樣的:
    1、spring隨應用啟動,創建QueuesHolder靜態類用于存放多種queueName的日志queue;創建JdbcLogWriter開始啟動線程,不停循環處理日志。
    2、log4j隨應用啟動,并產生日志,把日志存到queue中(使用offer方法)。
    3、JdbcLogWriter不停的把日志從queue中移出來(使用take方法)。
    3、每當有10條日志生成,JdbcLogWriter的updateBatch方法就把日志批量入庫,這個工作在processMesage方法里面。

    這就是springside日志入庫的整個過程了,茲以為記。
    4、



    我的微博 http://t.sina.com.cn/1401900445

    posted on 2011-02-13 21:20 月掛夜中央 閱讀(2209) 評論(0)  編輯  收藏 所屬分類: java咖啡杯

    主站蜘蛛池模板: 色欲色欲天天天www亚洲伊| 免费无毒a网站在线观看| 毛片A级毛片免费播放| 国产亚洲精品精品精品| 亚洲成AV人片在线观看ww| 日韩人妻无码精品久久免费一| 亚洲第一网站免费视频| 在线观看免费为成年视频| 一级毛片不卡免费看老司机| 亚洲理论电影在线观看| 97在线线免费观看视频在线观看| 国产精品亚洲一区二区三区在线观看| 久久精品亚洲乱码伦伦中文| 69av免费观看| 成年网站免费入口在线观看| 亚洲好看的理论片电影| 在线观看免费为成年视频| 91在线免费视频| 国产成人精品亚洲| 2020天堂在线亚洲精品专区| 久久青青草原亚洲av无码| 最新中文字幕免费视频| 久久午夜伦鲁片免费无码| 国产精品亚洲精品爽爽| 亚洲一级特黄特黄的大片| 亚洲国产精品自在线一区二区| 国产人成免费视频| 国语成本人片免费av无码| 免费网站观看WWW在线观看| 一级特级aaaa毛片免费观看| 亚洲免费综合色在线视频| 亚洲另类视频在线观看| 久久亚洲精品无码AV红樱桃| 亚洲人成色77777| 亚洲一区二区三区无码影院| 又粗又黄又猛又爽大片免费| 九九精品免费视频| 无码人妻一区二区三区免费| 波多野结衣免费在线| av免费不卡国产观看| 亚洲黄色免费电影|