<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    JAVA—咖啡館

    ——歡迎訪問rogerfan的博客,常來《JAVA——咖啡館》坐坐,喝杯濃香的咖啡,彼此探討一下JAVA技術(shù),交流工作經(jīng)驗,分享JAVA帶來的快樂!本網(wǎng)站部分轉(zhuǎn)載文章,如果有版權(quán)問題請與我聯(lián)系。

    BlogJava 首頁 新隨筆 聯(lián)系 聚合 管理
      447 Posts :: 145 Stories :: 368 Comments :: 0 Trackbacks
    最近在做一個銀行的生產(chǎn)數(shù)據(jù)脫敏系統(tǒng),今天寫代碼時遇到了一個“瓶頸”,脫敏系統(tǒng)需要將生產(chǎn)環(huán)境上Infoxmix里的數(shù)據(jù)原封不動的Copy到另一臺 Oracle數(shù)據(jù)庫服務(wù)器上,然后對Copy后的數(shù)據(jù)作些漂白處理。為了將人為干預(yù)的因素降到最低,在系統(tǒng)設(shè)計時采用Java代碼對數(shù)據(jù)作Copy,思路 如圖:



        首 先在代碼與生產(chǎn)庫間建立一個Connection,將讀取到的數(shù)據(jù)放在ResultSet對象,然后再與開發(fā)庫建立一個Connection。從 ResultSet取出數(shù)據(jù)后通過TestConnection插入到開發(fā)庫,以此來實現(xiàn)Copy。代碼寫完后運行程序,速度太慢了,一秒鐘只能Copy 一千條數(shù)據(jù),生產(chǎn)庫上有上億條數(shù)據(jù),按照這個速度同步完要到猴年馬月呀,用PreparedStatement批處理速度也沒有提交多少。我想能不能用多 線程處理,多個人干活總比一個人干活速度要快。
        假設(shè)生產(chǎn)庫有1萬條數(shù)據(jù),我開5個線程,每個線程分2000條數(shù)據(jù),同時向開發(fā)庫里插數(shù)據(jù),Oracle支持高并發(fā)這樣的話速度至少會提高好多倍,按照這 個思路重新進行了編碼,批處理設(shè)置為1萬條一提交,統(tǒng)計插入數(shù)量的變量使用 java.util.concurrent.atomic.AtomicLong,程序一運行,傳輸速度飛快CPU利用率在70%~90%,現(xiàn)在一秒鐘可 以拷貝50萬條記錄,沒過幾分鐘上億條數(shù)據(jù)一條不落地全部Copy到目標庫。

    在查詢的時候我用了如下語句
    1. String queryStr = "SELECT * FROM xx";  
    2. ResultSet coreRs = PreparedStatement.executeQuery(queryStr);  

    實習(xí)生問如果xx表里有上千萬條記錄,你全部查詢出來放到ResultSet, 那內(nèi)存不溢出了么?Java在設(shè)計的時候已經(jīng)考慮到這個問題了,并沒有查詢出所有的數(shù)據(jù),而是只查詢了一部分數(shù)據(jù)放到ResultSet,數(shù)據(jù)“用完”它 會自動查詢下一批數(shù)據(jù),你可以用setFetchSize(int rows)方法設(shè)置一個建議值給ResultSet,告訴它每次從數(shù)據(jù)庫Fetch多少條數(shù)據(jù)。但我不贊成,因為JDBC驅(qū)動會根據(jù)實際情況自動調(diào)整 Fetch的數(shù)量。另外性能也與網(wǎng)線的帶寬有直接的關(guān)系。
    相關(guān)代碼
      1 package com.dlbank.domain;  
      2   
      3 import java.sql.Connection;  
      4 import java.sql.PreparedStatement;  
      5 import java.sql.ResultSet;  
      6 import java.sql.Statement;  
      7 import java.util.List;  
      8 import java.util.concurrent.atomic.AtomicLong;  
      9   
     10 import org.apache.log4j.Logger;  
     11   
     12 /** 
     13  *<p>title: 數(shù)據(jù)同步類 </p>   
     14  *<p>Description: 該類用于將生產(chǎn)核心庫數(shù)據(jù)同步到開發(fā)庫</p>   
     15  *@author Tank Zhang  
     16  */  
     17 public class CoreDataSyncImpl implements CoreDataSync {  
     18       
     19     private List<String> coreTBNames; //要同步的核心庫表名  
     20     private ConnectionFactory connectionFactory;  
     21     private Logger log = Logger.getLogger(getClass());  
     22       
     23     private AtomicLong currentSynCount = new AtomicLong(0L); //當(dāng)前已同步的條數(shù)  
     24       
     25     private int syncThreadNum;  //同步的線程數(shù)  
     26   
     27     @Override  
     28     public void syncData(int businessType) throws Exception {  
     29           
     30         for (String tmpTBName : coreTBNames) {  
     31             log.info("開始同步核心庫" + tmpTBName + "表數(shù)據(jù)");  
     32             // 獲得核心庫連接  
     33             Connection coreConnection = connectionFactory.getDMSConnection(4);  
     34             Statement coreStmt = coreConnection.createStatement();  
     35             //為每個線程分配結(jié)果集  
     36             ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName);  
     37             coreRs.next();  
     38             //總共處理的數(shù)量  
     39             long totalNum = coreRs.getLong(1);  
     40             //每個線程處理的數(shù)量  
     41             long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum));   
     42             log.info("共需要同步的數(shù)據(jù)量:"+totalNum);  
     43             log.info("同步線程數(shù)量:"+syncThreadNum);  
     44             log.info("每個線程可處理的數(shù)量:"+ownerRecordNum);  
     45             // 開啟五個線程向目標庫同步數(shù)據(jù)  
     46             for(int i=0; i < syncThreadNum; i ++){  
     47                 StringBuilder sqlBuilder = new StringBuilder();  
     48                 //拼裝后SQL示例  
     49                 //Select * From dms_core_ds Where id between 1 And 657398  
     50                 //Select * From dms_core_ds Where id between 657399 And 1314796  
     51                 //Select * From dms_core_ds Where id between 1314797 And 1972194  
     52                 //Select * From dms_core_ds Where id between 1972195 And 2629592  
     53                 //Select * From dms_core_ds Where id between 2629593 And 3286990  
     54                 //..  
     55                 sqlBuilder.append("Select * From ").append(tmpTBName)  
     56                         .append(" Where id between " ).append(i * ownerRecordNum +1)  
     57                         .append( " And ")  
     58                         .append((i * ownerRecordNum + ownerRecordNum));  
     59                 Thread workThread = new Thread(  
     60                         new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName));  
     61                 workThread.setName("SyncThread-"+i);  
     62                 workThread.start();  
     63             }  
     64             while (currentSynCount.get() < totalNum);  
     65             //休眠一會兒讓數(shù)據(jù)庫有機會commit剩余的批處理(只針對JUnit單元測試,因為單元測試完成后會關(guān)閉虛擬器,使線程里的代碼沒有機會作提交操作);  
     66             //Thread.sleep(1000 * 3);  
     67             log.info( "核心庫"+tmpTBName+"表數(shù)據(jù)同步完成,共同步了" + currentSynCount.get() + "條數(shù)據(jù)");  
     68         }  
     69     }// end for loop  
     70       
     71     public void setCoreTBNames(List<String> coreTBNames) {  
     72         this.coreTBNames = coreTBNames;  
     73     }  
     74   
     75     public void setConnectionFactory(ConnectionFactory connectionFactory) {  
     76         this.connectionFactory = connectionFactory;  
     77     }  
     78       
     79     public void setSyncThreadNum(int syncThreadNum) {  
     80         this.syncThreadNum = syncThreadNum;  
     81     }  
     82       
     83     //數(shù)據(jù)同步線程  
     84     final class WorkerHandler implements Runnable {  
     85         ResultSet coreRs;  
     86         String queryStr;  
     87         int businessType;  
     88         String targetTBName;  
     89         public WorkerHandler(String queryStr,int businessType,String targetTBName) {  
     90             this.queryStr = queryStr;  
     91             this.businessType = businessType;  
     92             this.targetTBName = targetTBName;  
     93         }  
     94         @Override  
     95         public void run() {  
     96             try {  
     97                 //開始同步  
     98                 launchSyncData();  
     99             } catch(Exception e){  
    100                 log.error(e);  
    101                 e.printStackTrace();  
    102             }  
    103         }  
    104         //同步數(shù)據(jù)方法  
    105         void launchSyncData() throws Exception{  
    106             // 獲得核心庫連接  
    107             Connection coreConnection = connectionFactory.getDMSConnection(4);  
    108             Statement coreStmt = coreConnection.createStatement();  
    109             // 獲得目標庫連接  
    110             Connection targetConn = connectionFactory.getDMSConnection(businessType);  
    111             targetConn.setAutoCommit(false);// 設(shè)置手動提交  
    112             PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)");  
    113             ResultSet coreRs = coreStmt.executeQuery(queryStr);  
    114             log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr);  
    115             int batchCounter = 0//累加的批處理數(shù)量  
    116             while (coreRs.next()) {  
    117                 targetPstmt.setString(1, coreRs.getString(2));  
    118                 targetPstmt.setString(2, coreRs.getString(3));  
    119                 targetPstmt.setString(3, coreRs.getString(4));  
    120                 targetPstmt.setString(4, coreRs.getString(5));  
    121                 targetPstmt.setString(5, coreRs.getString(6));  
    122                 targetPstmt.addBatch();  
    123                 batchCounter++;  
    124                 currentSynCount.incrementAndGet();//遞增  
    125                 if (batchCounter % 10000 == 0) { //1萬條數(shù)據(jù)一提交  
    126                     targetPstmt.executeBatch();  
    127                     targetPstmt.clearBatch();  
    128                     targetConn.commit();  
    129                 }  
    130             }  
    131             //提交剩余的批處理  
    132             targetPstmt.executeBatch();  
    133             targetPstmt.clearBatch();  
    134             targetConn.commit();  
    135             //釋放連接   
    136             connectionFactory.release(targetConn, targetPstmt,coreRs);  
    137         }  
    138     }  
    139 }  

    posted on 2010-11-26 14:11 rogerfan 閱讀(982) 評論(0)  編輯  收藏 所屬分類: 【Java知識】
    主站蜘蛛池模板: 在线观看免费人成视频色9| 91精品免费高清在线| 国产精品va无码免费麻豆| 国产色在线|亚洲| 91精品成人免费国产片| 久久水蜜桃亚洲av无码精品麻豆| a级毛片毛片免费观看久潮喷| 亚洲中文字幕无码永久在线| 成年免费a级毛片| 国产精品亚洲美女久久久| 又大又硬又粗又黄的视频免费看| 全亚洲最新黄色特级网站 | 爱情岛论坛亚洲品质自拍视频网站| 最近的免费中文字幕视频| 亚洲综合无码一区二区痴汉| 毛色毛片免费观看| 亚洲成a∧人片在线观看无码| 高清国语自产拍免费视频国产 | 牛牛在线精品免费视频观看| 亚洲国产精品成人久久蜜臀| 国产97视频人人做人人爱免费| 久久久久久久综合日本亚洲| 久久久久久毛片免费播放| 亚洲an日韩专区在线| 国产一级大片免费看| 97在线免费观看视频| 亚洲精品美女久久久久| 女人与禽交视频免费看| 色哟哟国产精品免费观看| 久久国产亚洲观看| 无码中文在线二区免费| 深夜久久AAAAA级毛片免费看| 亚洲大尺度无码专区尤物| 国产精品久久久久免费a∨ | 男人的好免费观看在线视频| 精品亚洲av无码一区二区柚蜜| 久久激情亚洲精品无码?V| 91精品国产免费久久国语蜜臀| 亚洲精品无码mⅴ在线观看| 精品亚洲成α人无码成α在线观看 | 真实国产乱子伦精品免费|