<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆 - 42, 文章 - 1, 評論 - 0, 引用 - 0
    數據加載中……

    [IBM]構建輕量級 Batch 框架處理 DB2 Content Manager 8.3 大量數據導入

    【來源】http://www.ibm.com/developerworks/cn/data/library/techarticles/dm-0908luyx/

    本文介紹了如何使用多線程來構建輕量級 Batch 框架,將大量的數據遷移到 IBM DB2 Content Manager 8.3 中。通過本文的學習,讀者可以了解如何通過使用多線程調用 IBM DB2 Content Manager API 構建的框架來啟動,暫停,恢復,停止,放緩等操作。

    在用 API 導入大量數據的過程中,如果沒有框架很難有效的對整個過程控制,僅僅通過日志來分析解決問題總是很浪費時間,并且效率不太理想。

    本文的內容放在了如何使用多線程和配置文件來構建 Batch 框架來處理大數量導入的問題。

    隨著 IBM DB2 Content Manager(簡稱 IBM CM)產品的不斷成熟,越來越多的內容管理系統需要遷移到 IBM CM 中來,這些需要遷移的數據通常首先把結構化的內容導到文本文件中,與之相對應的圖像和 pdf 文件通常放在對應的文件夾中,圖像和 pdf 對應的文件夾路徑也通常存放在文本文件中,然后遷移程序遍歷文本文件,把對應的 Item 遷移到 IBM CM 中。這些需要遷移的數據通常都有幾百 G,如何有效的控制遷移過程是一個很大的挑戰,因此我們必須構建一個輕量級的 batch 處理框架來控制整個數據的遷移周期,記錄處理過程中的錯誤,保證數據的一致性。

    同時,在用 API 導入數據的過程中,被導入數據總是千邊萬化,無效的映射導入數據和 DB2 Content Manager 的項,導致工作變得復雜,同時使的設計和代碼冗余,并且使重用,維護和擴展履步為艱難。

    為了克服所提到的挑戰,這個 batch 框架必須要有以下功能:

    • 用戶出于不影響生產環境性能的考慮,可以暫時停止數據的遷移,或者減緩遷移處理的頻率,即框架必須具有 suspend 和 slowdown 功能。
    • 用戶可以讓暫停處理的系統繼續處理,即框架必須具有 resume 功能。
    • 用戶可以讓系統停止處理,修改某些配置,然后繼續處理,即框架必須有 re-start 功能。
    • 處理過程中發生的錯誤,警告系統必須記錄下來,用戶可以根據這些記錄來修正數據。
    • 通過配置文件建立規則來解決數據千邊萬化的問題。

    構建框架

    構建交互性

    要使框架有交互性,我們必須有三個個線程:客戶端線程,服務端線程,工作線程。客戶端線程負責發出工作指令,服務端線程接受這些指令并調用工作線程來做實際的工作。對于客戶端和服務器交互,在沒有 web 服務器支持的情況下,我們可以采用一種古老但是很有效的做法:socket 編程。 Java socket 對象的 accept 方法會一直阻塞直到客戶端有程序輸入,當客戶端有新的命令輸入的時候,服務器端從 socket 中讀出命令,然后執行命令。下面是示例程序,Client.java 代表客戶端程序,Server.java 代表服務器端程序,Worker.java 代表工作程序 ,Config.java 代表系統中一些參數配置。


    清單 1. 客戶端程序
    package com.ibm.batch.sample;
    
    import java.io.BufferedWriter;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.io.OutputStreamWriter;
    import java.net.Socket;
    import org.apache.log4j.Logger;
    
    public class Client {
    	private Config config = null;
    	public void setConfig(Config config) {
    		this.config = config;
    	}
    	private Logger logger = Logger.getLogger(Client.class);
    	public void sendCommand(String command) {
    		Socket socket = null;
    		OutputStream out = null;
    		BufferedWriter writer = null;
    		try {
    			// establish the connection with server.
    			socket = new Socket(config.getHost(), config.getSocketPort());
    			out = socket.getOutputStream();
    			writer = new BufferedWriter(new OutputStreamWriter(out));
    			// send the command to server
    			writer.write(command);
    			writer.flush();
    		} catch (IOException e) {
    			logger.error(e.getMessage(), e);
    			throw new RuntimeException(e);
    		}
    	}
    }
    


    清單 2. 服務器端程序
    package com.ibm.batch.sample;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.ServerSocket;
    import java.net.Socket;
    import com.ibm.batch.sample.util.ResourceUtils;
    
    public class Server {
    	private Config config = null;
    	private boolean processing = true;	
    	private Worker worker = null;
    	public void setConfig(Config config) {
    		this.config = config;
    	}
    	public static void main(String[] args) {
    		Server server = new Server();
    		// create the work thread
    		Worker worker = server.createWorker(args);
    		worker.start();		
    		server.receiveAndExecuteCommand();
    	}
    	private Worker createWorker(String[] args) {
    		Worker worker = new Worker();
    		this.worker = worker;
    		return worker;
    	}
    	/**
    	 * receive the command from client and execute the command. the method is
    	 * keeping running until client send the 'stop' command.
    	 * 
    	 * @throws Exception
    	 */
    	public void receiveAndExecuteCommand() {
    		ServerSocket serverSocket = buildSocketConnection();
    		// loop until client send 'stop' command
    		while (processing) {
    			Socket socket = null;
    			try {
    			socket = serverSocket.accept();
    			String commandLine = readCommandFromSocket(socket);
                    executeCommand(commandLine);
    			} catch (Exception e) {
    				throw new RuntimeException(e);
    			} finally {
    				ResourceUtils.closeSocket(socket);
    			}
    		}
    	}
    	private void executeCommand(String commandLine) {
    		// TODO Auto-generated method stub
    	}
    
    	/**
    	 * read the command from the socket
    	 * 
    	 * @param socket
    	 * @return
    	 */
    	private String readCommandFromSocket(Socket socket) {
    		InputStream in = null;
    		BufferedReader bufferedReader = null;
    		String commandLine = null;
    		try {
    			in = socket.getInputStream();
    			bufferedReader = new BufferedReader(new InputStreamReader(in));
    			commandLine = bufferedReader.readLine();
    		} catch (IOException e) {
    			throw new RuntimeException(e);
    		} finally {
    			ResourceUtils.closeInputStream(in);
    			ResourceUtils.closeReader(bufferedReader);
    		}
    		return commandLine;
    	}
    	/**
    	 * build the socket.
    	 * 
    	 * @return
    	 */
    	private ServerSocket buildSocketConnection() {
    		// prepare the socket for client to connect.
    		ServerSocket serverSocket;
    		try {
    			serverSocket = new ServerSocket(config.getSocketPort());
    		} catch (java.net.BindException e1) {
    			throw new RuntimeException("Socket port already in use.", e1);
    		} catch (IOException ioe) {
    			throw new RuntimeException(ioe);
    		}
    		return serverSocket;
    	}
    }
    


    清單 3. 工作程序
    package com.ibm.batch.sample;
    
    import org.apache.log4j.Logger;
    
    public class Worker extends Thread {
    	Logger logger = Logger.getLogger(Worker.class);
    	/**
    	 * the main method for create item function.
    	 */
    	public void run() {
    		createItem();
    	}
    	/**
    	 * do the real job
    	 */
    	private void createItem() {	 		
    	}
    }
    

    添加 suspend 和 slowdown 處理命令

    大數量的數據遷移一般是在周末或者晚上進行,但是如果客戶的歷史數據太大,在周末或者晚上數據可能處理不完,為了不影響生產環境的性能,我們必須能夠在客戶的工作時間暫緩處理或者降低處理的頻率,把 cpu 等資源讓給客戶程序,也就是說處理線程 worker 的工作可以 suspend 或者 slowdow 。為了讓 worker 線程知道需要 suspend 當前處理,我們可以在 worker 內部設置一個布爾變量 isSuspend,當程序在循環創建 CM item 的時候,我們每次都判斷一下這個布爾變量 isSuspend,當其為 ture 的時候,程序就調用線程的 wait 方法中斷當前線程的處理,wait 方法還可以接受一個以微秒為單位的時間參數,當時間到達 wait 指定的時間的時候,程序繼續創建 CM Item 。為了多線程之間的變量可見性,我們必須把 worker 的 isSuspend 變量和 suspendTime 設置為 volatile 。同理我們設置一個布爾變量 isSlowdown 以及 slowdowTime 。示例程序如下:


    清單 4. 工作程序
    package com.ibm.batch.sample;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.IOException;
    import org.apache.log4j.Logger;
    import com.ibm.batch.sample.util.ResourceUtils;
    
    public class Worker extends Thread {
    	Logger logger = Logger.getLogger(Worker.class);	
    	private volatile boolean isSlowdown = false;
    	private volatile Double slowdownTime;
    	private volatile boolean isSuspend;
    	private volatile Double suspendTime;
    	public void setSlowdown(boolean isSlowdown) {
    		this.isSlowdown = isSlowdown;
    	}
    	public void setSlowdownTime(Double slowdownTime) {
    		this.slowdownTime = slowdownTime;
    	}
    	public void setSuspend(boolean isSuspend) {
    		this.isSuspend = isSuspend;
    	}
    	public void setSuspendTime(Double suspendTime) {
    		this.suspendTime = suspendTime;
    	}
    	public boolean isSlowdown() {
    		return isSlowdown;
    	}
    	public Double getSlowdownTime() {
    		return slowdownTime;
    	}
    	public boolean isSuspend() {
    		return isSuspend;
    	}
    	public Double getSuspendTime() {
    		return suspendTime;
    	}
    	protected Object semaphore = new Object();
    	private Config config;
    	public void setConfig(Config config) {
    		this.config = config;
    	}
    


    清單 5. 主方法
    /**
    	 * the main method for create item function.
    	 */
    	public void run() {
    		BufferedReader reader = null;
    		try {
    			reader = getFileReader();
    			String oneLine = null;
    			while ((oneLine = reader.readLine()) != null) {
    				if (isSlowdown()) {
    					sleep4GivenTime();
    				}
    				if (isSuspend()) {
    					suspend4SomeTime();
    				}
    				createItem(oneLine);
    			}
    		} catch (Exception e) {
    			throw new RuntimeException(e);
    		} finally {
    			ResourceUtils.closeReader(reader);
    		}
    	}
    
    	/**
    	 * current thread sleep for some time,the unit is minute.
    	 */
    	protected void sleep4GivenTime() {
    		try {
    			Thread.sleep((long) (slowdownTime.doubleValue() * 1000));
    		} catch (InterruptedException e) {
    			// do nothing
    		}
    	}
    


    清單 6.Suspend 方法
    			
    /**
     * suspend working for given time.
     */
    protected void suspend4SomeTime() {
    	synchronized (semaphore) {
    	try {
    	Double suspendTime = getSuspendTime();
    	if (suspendTime != null) {
    		double suspendTimeDouble = suspendTime.doubleValue() * 60 * 1000;
    		semaphore.wait((long) suspendTimeDouble);
    	} else {
    	                     semaphore.wait();
    	}
    	} catch (InterruptedException e) {
    	// tell user that the processing started
    	logger.info("suspend is over,system is continue processing .");
    	}
    	}
    	}
    	/**
    	 * do the real job
    	 * 
    	 * @throws Exception
    	 */
    	private void createItem(String oneLine) throws Exception {
    	}
    	private BufferedReader getFileReader() throws FileNotFoundException {
    		String fileName = config.getFileName();
    		File processingFile = new File(fileName);
    		BufferedReader reader = new BufferedReader(new FileReader(
    				processingFile));
    		return reader;
    	}
    }
    

    添加 resume 功能

    在程序暫停處理以后,我們可以提前終止 suspend,讓框架繼續處理,也就是框架必須有 resume 功能。我們調用 Worker.java 對象上的 notify 方法來實現這個功能,示例如下:


    清單 7.Resume
    public class Worker extends Thread {
    	/**
    	 * resume the working.
    	 */
    	public void continueWorking() {
    		cleanSuspend();
    		synchronized (semaphore) {
    			semaphore.notify();
    		}
    	}
    }
    
     

    添加 stop 和 re-start 功能

    有時候用戶因為一些原因(例如修改配置文件)想停止程序的執行,所以框架必須有 stop 的功能,但是 stop 的時候我們必須注意記錄程序處理到的行數,這樣客戶再開始執行的時候能夠從上次執行的斷點繼續執行,也就是框架具備了 re-start 功能,這是 batch 程序必須具備的一種很重要的功能,re-start 功能有多種實現方法,我們這里采取一種簡單的方法,在 stop 的時候,把當前處理的記錄到一個文本文件中去,下次啟動的時候從上次最后處理的對象開始進行處理。所以我們在 Worker.java 中增加一個 keepProcessing 布爾變量,在循環創建 CM Item 的時候 , 我們每次都判斷一下這個值是否為 true,如果為 false 的話,我們就停止循環處理,在 Worker.java 中還要增加一個 moveReaderToLastProcess 方法,把 reader 重新定向到上次處理點。


    清單 8. 停止和重啟
    public class Worker extends Thread {
    	private volatile boolean keepProcessing;
    	public boolean isKeepProcessing() {
    		return keepProcessing;
    	}
    	public void setKeepProcessing(boolean keepProcessing) {
    		this.keepProcessing = keepProcessing;
    	}
    	/**
    	 * the main method for create item function.
    	 */
    	public void run() {
    		BufferedReader reader = null;
    		try {
    			long lastProcessedRow = config.getLastProcessedRow();	
    			reader = moveReaderToLastProcess(lastProcessedRow);
    			String oneLine = null;
    			connectToCM();
    			while (((oneLine = reader.readLine()) != null)
    					&& isKeepProcessing()) {
    				if (isSlowdown()) {
    					sleep4GivenTime();
    				}
    				if (isSuspend()) {
    					suspend4SomeTime();
    				}
    				createItem(oneLine);
    				lastProcessedRow++;
    			}
    			logCurrentProcessingLine(lastProcessedRow);
    		} catch (Exception e) {
    			throw new RuntimeException(e);
    		} finally {
    			ResourceUtils.closeReader(reader);
    		}
    	}	
    	private void logCurrentProcessingLine(long lastProcessedRow) {
    		config.setLastProcessedRow(lastProcessedRow);
    	}	
    	/**
    	 * move current reader position to last process postion
    	 * @return
    	 * @throws IOException
    	 */
    private BufferedReader moveReaderToLastProcess(long lastProcessedRow) 
             throws IOException {
    		// get the file reader
    		BufferedReader reader = getFileReader();	
    			
    		// move the reader to the start row -1.
    		int count = 0;
    		while (count < lastProcessedRow-1) {
    			reader.readLine();			
    			count++;
    		}
    		return reader;
    	}
    }
    

    添加錯誤處理功能

    剛才我們調用的 createItem 方法是直接拋出異常的,但是這樣的處理實際上是錯誤的,因為在 batch 處理過程中,我們不希望在處理某一個 item 出錯導致剩余的 item 不再處理,所以我們在 catch 里面對異常進行分類處理,我們 catch 住非檢查異常(runtime exception),通常非檢查異常是不可以恢復的,所以我們直接拋出,讓程序結束處理。對于其余的異常,我們只是在日志中記錄下來,并不拋出。在全部處理結束以后,用戶可以檢查日志來進行相應的處理。示例代碼如下:


    清單 9. 錯誤處理
    public class Worker extends Thread {
    	/**
    	 * do the real job
    	 * 
    	 * @throws Exception
    	 */
    	private void createItem(String oneLine) throws Exception {
    		try {
    			//create the item from one line
    		}catch (RuntimeException e) {
    			throw e;
    		}catch (Exception e) {
    			logger.error(e.getMessage(),e);
    		}
    	}
    }
    

    添加創建 CM item 功能

    下面的內容放在了如何使用配置文件來處理導入的問題。

    通過調用和運行 API 來處理數據的導入,我們首先定義一個基本信息的配置文件,用來制定連接的信息,其他配置文件的目錄,工作的目錄等有關導入需要的參數。然后定義導入數據和 DB2 Content Manager 的項的映射配置文件。配置文件定義結束后,我們就可以調用API來啟動相應的導入流程,在程序運行過程中,可以動態的更改配置,從而有效的處理導入的任務。

    在開發過程中,您可以靈活地定義各種配置文件以便實現多種導入規則,同時在程序運行中進行數據校驗,以防止冗余和非法數據被錯誤導入。

    下面的一些配置和代碼示例,以此介紹了如何定義配置文件,然后管理 API 來完成導入的任務。

    定義基本信息配置文件:在該文件中,須先設定 IBM DB2 Content Manager 的一些連接參數, 如:

    contentManagerDatabase=iCMnlsdb // 定義調用的數據庫名字
     contentManagerUsername=iCMadmin // 定義用戶名
     contentManagerPassword= password // 定義連接密碼
     contentManagerSchema=ICMADMIN // 定義具體的 schema

    您可以在代碼中用以上參數來實現對 IBM DB2 Content Manager 的連接,代碼示例:

    DKDatastoreICM dsICM = new DKDatastoreICM(); 
    // 創建連接 dsICM.connect("iCMnlsdb", "iCMadmin", "password", "SCHEMA=ICMADMIN");

    還需指定哪個文件夾存放映射文件,以及需導入的數據文件,如:

    mappingFilePath=config/rapid/mapping // 映射文件路徑
     dataFileFolder=config/rapid/data // 數據文件路徑

    也可定義一些參數來增強該導入的流程控制,如:

    runPhase=2 
    // 指定是第二階段導入,在導入時需更新已有的數據

    定義映射文件:該配置文件主要用于將用戶想要導入的數據映射到 IBM DB2 Content Manager 的 Item Type 中,您可自由定制該文件,使用戶遵循您定義的規范順利完成數據遷移。如:

    C001.del=c01 
     C002.del=c01

    該定義中 C001.del 和 C002.del 是需要導入的數據文件,c01 是對應的 Item Type 名字。這種定義方法可實現將多個數據文件導入同一個 Item Type 中。

    具體的對應關系如下:

    position=1|name=COMPANYNAME 
     position=2|name=COMPANYID 
     position=3|name=INPUTVALUE 
     position=-1|name=SPECIALVALUE|value=C1

    這個映射關系反映了數據文件中列數和 Item Type 中 attribute 的關系,如第一列在 Item Type 中代表了名字為 COMPANYNAME 的 attribute 。您也可定義一些特殊規則,如將 position 設為負數,以便反映該列是一個特殊的 attribute, 它的值是固定的。 比如將 position 設為 -1 時,名為 SPECIALVALUE 的 attribute 的值總是為 C1 。

    若您想實現將一個數據文件導入多個 Item Type 中,可在數據文件中加入一個特殊列,在映射文件中指定該列的列數,以及當該列的值和多種 Item Type 的映射關系。如:

    C003.del(position:3)

    這樣,C003.del 就不是單一的對應一個 Item Type,而是先去取第三列 INPUTVALUE 的值,再去對應表中查找到關聯的 Item Type 。該對應表可設成:

    Value1=c01 
     Value2=c02

    若第三列 INPUTDOCID 的值為 Value1 時,其對應的 Item Type 為 c01,同樣的當值為 Value2 時,會將該行數據導入到 c02 的 Item Type 中。

    調用 API 完成操作的代碼示例:在編寫代碼過程中,需要調用 DB2 Content Manager 的 API 來完成 Item Type 以及它包含的 attribute 的創建。上文已給出了通過參數來連接 Content Manager 的方法,下面的示例代碼用得到的 DKDatastoreICM 來實現具體的操作:


    清單 10. API 調用
    // Create an item / DDO / Root Component
    DKDDO ddo = dsICM.createDDO("S_withChild", itemPropertyOrSemanticType); 
    //createDDO(<Overall Item Type>, <Item Property / Semantic Type>);
    
    // Adding Multivalue Attributes to DDOs, multiple type can be used, 
    //here just give some example
     ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_varchar"), 
    "this is a string value");  
      //string
    
    ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_date"), 
    java.sql.Date.valueOf("2001-08-12"));
     //date
    
    ddo.setData(ddo.dataId(DKConstant.DK_CM_NAMESPACE_ATTR,"S_double"), 
    new Double("123")); 
    //double
    

    結束語

    通過本文的介紹,相信您對多線程構建的 Batch 框架實現大量數據遷移的過程,和通過配置文件的管理的 API 實現數據導入的過程也有了一定的了解和學習。您可靈活地實現一對一,一對多,多對多等各種映射關系,您也可以利用多線程來實現其他的功能的開發,編寫出更加富有創造性的軟件。


    參考資料

    學習

    獲得產品和技術

    • 使用可直接從 developerWorks 下載的 IBM 試用軟件 構建您的下一個 Linux 開發項目。

    posted @ 2011-12-28 17:16 段旭 閱讀(444) | 評論 (0)編輯 收藏

    僅列出標題
    共2頁: 上一頁 1 2 
    主站蜘蛛池模板: 国产男女性潮高清免费网站| 无码一区二区三区AV免费| 久久久久亚洲爆乳少妇无| 黄色一级毛片免费看| 免费一级毛片清高播放| 爱情岛论坛免费视频| 日韩亚洲精品福利| 色噜噜狠狠色综合免费视频 | 亚洲成a人片在线观看日本| 一二三四在线观看免费中文在线观看| 粉色视频成年免费人15次| 亚洲M码 欧洲S码SSS222| 99久久精品毛片免费播放| 久久精品国产亚洲综合色| 最近中文字幕2019高清免费| 亚洲二区在线视频| 午夜爱爱免费视频| 一级做a爰黑人又硬又粗免费看51社区国产精品视| 免费永久在线观看黄网站| 成a人片亚洲日本久久| 中文字幕亚洲图片| 麻豆高清免费国产一区| 亚洲一区二区三区丝袜| 久久精品国产精品亚洲艾草网美妙| 亚洲一区免费视频| 日韩一级在线播放免费观看| 国产特黄一级一片免费| 精品日韩亚洲AV无码一区二区三区| 污视频网站免费观看| 国产亚洲美女精品久久久| 性xxxxx大片免费视频| 亚洲一级片在线播放| 免费国产a国产片高清| 久久青草免费91线频观看站街| 免费一级毛片一级毛片aa| 免费av一区二区三区| 亚洲熟妇丰满xxxxx| 亚洲日韩精品一区二区三区| av免费不卡国产观看| 一级毛片在播放免费| 久久久国产精品亚洲一区|