(文章本人原創,若轉載請注明出處)
下面看一下一些具體的實現,先看一下Sender接口的commitData方法的MySql實現,即SenderMySqlImp類:
public void commitData(String path, String file) {
......
Connection connect = null;
try {
//根據配置由Helper來確定是否用連接池,這只調用getConnection()即可
connect = Helper.getConnection();
connect.setAutoCommit(false);
FileInputStream fis = new FileInputStream(path + file);
//insert語句,有三個參數分別為id,image,filename字段。
ps = connect.prepareStatement(sql.toString());
ps.setString(1, UUID.randomUUID().toString());
//將圖片文件流化,用jdbc直接寫到數據庫
ps.setBinaryStream(2, fis, fis.available());
ps.setString(3, file);
ps.executeUpdate();
connect.commit();
count++;
Logger.writeLog("已處理文件數:"+count+",時間:"+new java.util.Date());
} catch (Exception e) {
........
}
很簡單吧,其實就是用Stream來做,另外在網上可以找到有關Oracle上傳blob的實現,一般都是先insert一條記錄,blob字段用empty_blob()函數插入一個空數據,然后再取出這個blob字段,最后按字節寫入blob,具體參考SenderOracleImp類吧。個人感覺還是在mysql的這個效率高些并且看起來簡單了很多。
然后來看看使用線程池的ProcessMulti類:
public class ProcessMulti implements Process{
private String path;
private Vector<String> files = new Vector<String>();
private Properties prop;
ProcessMulti() {
prop = ConfigMgr.getProperties(); //取config.properties中配置信息
this.path = prop.getProperty("path");
this.files = Helper.getFiles(prop.getProperty("filetype"), this.path);
}
public void doProcess() {
//正如前面兩篇所說,這里是線程池構建器,傳入相關參數
BlobSenderThreadPool tpe = new BlobSenderThreadPool(Integer
.valueOf(prop.getProperty("corePoolSize")), Integer
.valueOf(prop.getProperty("maxPoolSize")), Integer.valueOf(prop
.getProperty("keepAliveTime")), TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(Integer.valueOf(prop
.getProperty("maxPoolSize"))),
new BlobSenderThreadPool.DiscardPolicy(), files.size());
Logger.writeLogForce("開始處理
." + new java.util.Date());
for (int i = 0; i < this.files.size(); i++) {
//向線程池提交要處理的任務,線程池根據其配置進行處理
//Helper.getSender()會根據配置取得支持mysql或是oracel的寫入方法
tpe.execute(new Runner(path, files.get(i), Helper.getSender()));
Logger.writeLog("已提交第" + (int)(i+1) + "個文件" + ",時間為:"
+ new java.util.Date());
}
//可以在這里寫一個打印輸出,實際上程序很快就執行完上面的for,運行到這里,但是處理并沒有完成,
//主程序好像職業經理人,他的工作就是分配任務給下屬,自已就完成工作了,但下屬們還要接著忙活呵呵...
System.out.println("任務已分配...");
}
//線程池類
class BlobSenderThreadPool extends ThreadPoolExecutor {
volatile int planTask;//計劃任務,即計劃要寫的文件數
public BlobSenderThreadPool(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler, int planTask) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.planTask = planTask;
}
@Override
//當某個線程處理完時會調用此方法
protected void afterExecute(Runnable r, Throwable t) {
Logger.writeLog("當前已完成任務數:" + (int)(this.getCompletedTaskCount()+1)
+ "計劃任務數:" + planTask);
//若已處理完任務和計劃的任務數相同說明所有線程已完成處理,終止線程池處理
if ((this.getCompletedTaskCount()+1)== planTask)
this.terminated();
super.afterExecute(r, t);
}
@Override
protected void terminated() {
Logger.writeLogForce("所有任務已完成 ,處理結束時間===>>" + new java.util.Date());
System.exit(0);
}
}
//要使用線程進行處理,類要實現Runable接口
class Runner implements Runnable {
String file;
String path;
Sender sender;
Runner(String path, String file, Sender sender) {
this.file = file;
this.path = path;
this.sender = sender;
}
//Runer的實例會被傳入線程池,線程被運行時會調用run方法
public void run() {
sender.commitData(path, file);
}
}
posted @
2009-03-21 10:40 依然Fantasy 閱讀(622) |
評論 (0) |
編輯 收藏
(文章本人原創,若轉載請注明出處)
在實際當中的情況是系統數據庫中需要上傳大量照片到數據庫中,數據量比較大,且不能在界面中通過操作逐個上傳,要批量自動進行。其實起來也比較簡單直接利用線程池將照片數據讀取成流再存入BLOB字段即可。但是在實現后些功能后又進入了一些改造,實現了線程池、單線程、是否使用用連接池、不同數據庫等不同的配置,這樣在不同配置下可以觀察到程序性能的不同。并且經過設計切換這些配置不需要修改程序。

使用DbAccess接口的getConnect()取得數據庫連接,DbImp和DbPoolingImp實現了不使用連接池和使用連接池的兩個版本。Sender接口的commitData()用來把BLOB數據寫到數據庫中,因為不同數據庫可能寫法有點不同所以這里SenderMySqlImp和SenderOracleImp分別是Mysql和Oracle的實現。Process接口的doProcess()是開始進行處理的方法,無論是單線程還是多線程。因此ProcessMulti和ProcessSingle是分別使用線程池以及單線程處理的類。ConfigMgr用于取得config.properties文件內配置信息,Logger是日志類,Helper中匯集了一些共用的靜態方法。最后DataSender是主程序的類:)
posted @
2009-03-21 10:25 依然Fantasy 閱讀(306) |
評論 (0) |
編輯 收藏
(文章本人原創,若轉載請注明出處)
在JDK1.5提供了一個線程池ThreadPoolExecutor,可以處理用戶提交過來的線程。如果把要處理的任務比作蓋一個大樓,那么每一個建筑工人就相當于一個線程,那么這個ThreadPoolExecutor就好像包工頭,它來控制蓋這個大樓需要多少個工人,何時招進新工人,何時辭退已經長時間沒有事做的工人,等等此類事務。也就是說用戶程序不斷提交新的線程,ThreadPoolExecutor執行提交線程的同時會控制目前總共同時執行的線程數,銷毀已執行完閑置的線程等控制行為,保留最少閑置線程數,并且可以配置不同的處理策略。
為什么要使用線程池呢,這與數據庫連接池的原理有點相仿,線程的創建是需要成本的,包括服務器CPU和內存資源,由于多線程是并行運行,程序運行過程中可能有的線程已經完成自身處理任務,處于閑置狀態,如果在這種情況下再不斷創建新任務就是在浪費服務器資源,此時應該盡量使用先前創建的好的并且是處理閑置狀態的線程來處理新任務,而線程池就可以有效的對此進行自動化管理,當然這個管理是可以由用戶配置的。
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
這是線程池的構建器,用戶程序通過這個構建器傳參數,corePoolSize是線程池中核心線程數,運行的線程數不能少于這個核心線程數,否則就新建線程。maximumPoolSize是充許最大的線程數。keepAliveTime設置除核心線程外其它線程的空閑時間,超過這個時間線程就自動終止。unit是指的keepAliveTime的時間單位。BlockingQueue接口按生產則消費者算法設計的一個線程池內部處理線程隊列的接口,有三種實現SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue,在實際運行程序時可以根據這三種實現得到不同的性能,比如有的實現可能在有新任務來時不新建線程,而是將其加入等待隊列,等有線程運行完時再分配給其使用。具體實現還是參看它們的JDK文檔吧,這里站在使用的角度它們是可以調整運行性能的開關。當最大線程和工作隊列容量都達到最大值時,再提交給線程池新任務就會被拒絕,此時線程池會調用RejectedExecutionHandler 接口進行處理,具體實現有四種策略。我們只需要選用其中一種在構建ThreadPoolExecutor時傳入即可。具體四種實現還是參看JDK文檔吧。關于ThreadPoolExecutor的JDK文檔。至此控制線程池運作的幾個參數都從構建器中傳入了。
posted @
2009-03-19 22:26 依然Fantasy 閱讀(846) |
評論 (0) |
編輯 收藏