<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    聶永的博客

    記錄工作/學習的點點滴滴。

    為什么批量請求要盡可能的合并操作

    前言

    線上情況:

    1. 線上Redis集群,多個Twemproxy代理(nutcracker),LVS DR路由均衡調度
    2. 客戶端使用Jedis操作Redis集群,一個程序進程實例使用原先1024個工作線程處理請求,若干個進程實例
    3. 一天超過22億次請求,網絡一般情況下,一天超過上萬個連接失敗異常
    4. 運維同學告知,LVS壓力較大

    改進工作:

    1. 工作線程由原先1024改用16個
    2. 每個線程每次最多操作1000個Redis命令批量提交

    實際效果:

    1. 一天不到一億次的請求量
    2. LVS壓力大減
    3. CPU壓力降低到原先1/3以下
    4. 單個請求抽樣調研平均減少1-90毫秒時間(尤其是跨機房處理)

    Redis支持批量提交

    原生支持批量操作方式

    一般命令前綴若添加上m字符串,表示支持多個、批量命令提交了。

    顯式的...

    MSET key value [key value ...]
    MSETNX key value [key value ...]
    
    HMGET key field [field ...]
    HMSET key field value [field value ...]
    

    一般方式的...

    HDEL key field [field ...]
    SREM key member [member ...]
    RPUSH key value [value ...]
    ......
    

    更多,請參考:http://redis.cn/commands.html

    pipeline管道方式

    官方文檔:http://redis.io/topics/pipelining

    1. Redis Client把所有命令一起打包發送到Redis Server,然后阻塞等待處理結果
    2. Redis Server必須在處理完所有命令前先緩存起所有命令的處理結果
    3. 打包的命令越多,緩存消耗內存也越多
    4. 不是打包的命令越多越好
    5. 實際環境需要根據命令執行時間等各種因素選擇合并命令的個數,以及測試效果等

    Java隊列支持

    一般業務、接入前端請求量過大,生產者速度過快,這時候使用隊列暫時緩存會比較好一些,消費者直接直接從隊列獲取任務,通過隊列讓生產者和消費者進行分離這也是業界普通采用的方式。

    監控隊列

    有的時候,若可以監控一下隊列消費情況,可以監控一下,就很直觀。同事為隊列添加了一個監控線程,清晰明了了解隊列消費情況。

    示范

    示范使用了Redis Pipeline,線程池,準備數據,生產者-消費者隊列,隊列監控等,消費完畢,程序關閉。

    /**
     * 以下測試在Jedis 2.6下測試通過
     * 
     * @author nieyong
     * 
     */
    public class TestJedisPipeline {
        private static final int NUM = 512;
        private static final int MAX = 1000000; // 100W
    
        private static JedisPool redisPool;
        private static final ExecutorService pool = Executors.newCachedThreadPool();
        protected static final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(
                MAX); // 100W
        private static boolean finished = false;
    
        static {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxActive(64);
            config.setMaxIdle(64);
    
            try {
                redisPool = new JedisPool(config, "192.168.192.8", 6379, 10000,
                        null, 0);
            } catch (Exception e) {
                System.err.println("Init msg redis factory error! " + e.toString());
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            System.out.println("prepare test data 100W");
            prepareTestData();
            System.out.println("prepare test data done!");
    
            // 生產者,模擬請求100W次
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < MAX; i++) {
                        if (i % 3 == 0) {
                            queue.offer("del_key key_" + i);
                        } else {
                            queue.offer("get_key key_" + i);
                        }
                    }
                }
            });
    
            // CPU核數*2 個工作者線程
            int threadNum = 2 * Runtime.getRuntime().availableProcessors();
    
            for (int i = 0; i < threadNum; i++)
                pool.execute(new ConsumerTask());
    
            pool.execute(new MonitorTask());
    
            Thread.sleep(10 * 1000);// 10sec
            System.out.println("going to shutdown server ...");
            setFinished(true);
            pool.shutdown();
    
            pool.awaitTermination(1, TimeUnit.MILLISECONDS);
    
            System.out.println("colse!");
        }
    
        private static void prepareTestData() {
            Jedis redis = redisPool.getResource();
            Pipeline pipeline = redis.pipelined();
    
            for (int i = 0; i < MAX; i++) {
                pipeline.set("key_" + i, (i * 2 + 1) + "");
    
                if (i % (NUM * 2) == 0) {
                    pipeline.sync();
                }
            }
            pipeline.sync();
            redisPool.returnResource(redis);
        }
    
        // queue monitor,生產者-消費隊列監控
        private static class MonitorTask implements Runnable {
    
            @Override
            public void run() {
                while (!Thread.interrupted() && !isFinished()) {
                    System.out.println("queue.size = " + queue.size());
                    try {
                        Thread.sleep(500); // 0.5 second
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
        }
    
        // consumer,消費者
        private static class ConsumerTask implements Runnable {
            @Override
            public void run() {
                while (!Thread.interrupted() && !isFinished()) {
                    if (queue.isEmpty()) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                        }
    
                        continue;
                    }
    
                    List<String> tasks = new ArrayList<String>(NUM);
                    queue.drainTo(tasks, NUM);
                    if (tasks.isEmpty()) {
                        continue;
                    }
    
                    Jedis jedis = redisPool.getResource();
                    Pipeline pipeline = jedis.pipelined();
    
                    try {
                        List<Response<String>> resultList = new ArrayList<Response<String>>(
                                tasks.size());
    
                        List<String> waitDeleteList = new ArrayList<String>(
                                tasks.size());
    
                        for (String task : tasks) {
                            String key = task.split(" ")[1];
                            if (task.startsWith("get_key")) {
                                resultList.add(pipeline.get(key));
                                waitDeleteList.add(key);
                            } else if (task.startsWith("del_key")) {
                                pipeline.del(key);
                            }
                        }
    
                        pipeline.sync();
    
                        // 處理返回列表
                        for (int i = 0; i < resultList.size(); i++) {
                            resultList.get(i).get();
                            // handle value here ...
                            // System.out.println("get value " + value);
                        }
    
                        // 讀取完畢,直接刪除之
                        for (String key : waitDeleteList) {
                            pipeline.del(key);
                        }
    
                        pipeline.sync();
                    } catch (Exception e) {
                        redisPool.returnBrokenResource(jedis);
                    } finally {
                        redisPool.returnResource(jedis);
                    }
                }
            }
        }
    
        private static boolean isFinished(){
            return finished;
        }
    
        private static void setFinished(boolean bool){
            finished = bool;
        }
    }
    

    代碼作為示范。若線上則需要處理一些異常等。

    小結

    若能夠批量請求進行合并操作,自然可以節省很多的網絡帶寬、CPU等資源。有類似問題的同學,不妨考慮一下。

    posted on 2014-11-09 22:08 nieyong 閱讀(16192) 評論(17)  編輯  收藏 所屬分類: Socket

    評論

    # re: 為什么批量請求要盡可能的合并操作 2014-11-10 09:34 下巴長痘痘是什么原因

    經過樓主的講解,我現在才明白為什么批量請求要盡可能的合并操作  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作[未登錄] 2014-11-11 13:36 劉洋

    好專業啊...每個月有一諞  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2014-11-13 14:37 Stanley Xu

    線程數要根據cpu的情況而決定的,一臺4核的機器開40個線程就是蛋疼。同步、context switch的開銷已經超過了線程帶來的優勢。如果不合并,僅僅減少線程數,性能也會有所優化。  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2014-11-16 09:12 妞妞寶貝衣間

    支持博主分享,歡迎到我的小店、、、、  回復  更多評論   

    # gank開黑吧 2014-11-16 22:13 gank開黑吧

    gank開黑吧http://www.kaihei8.com 贊一下博主  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2014-11-16 23:37 網絡營銷技巧

    看了樓主的講解,我現在才大致明白  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2014-11-17 14:32 臉上長粉刺是什么原因

    不錯的文章,學習了  回復  更多評論   

    # 武岡SEO 2014-11-25 22:57 794680490@qq.com

    文章很實用,學習了,到時實踐下  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2015-01-24 15:09 黎洪鑫

    請教一下,我們使用pipeline的方式后,出現了在一些閑時,內存暴漲。然后kill掉twemproxy之后就降下來了。然后查了相關的資料,把pipeline的數量降到500,甚至20了,仍然出現。而且11臺機器中,有一些機器經常出現,但是最近經常出現的不出現,從沒出現的又出現這情況了。不知道您是否有遇到過,如果解決。  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2015-01-26 09:39 nieyong

    @黎洪鑫
    沒有遇見過類似問題,愛莫能助。
    因為pipeline是一個阻塞請求-響應過程,這一點很重要;另外網絡機房擁塞會導致非常大的延遲,具體情況就是請求發出去,等待很長時間響應。若是機房網絡延遲問題,可以考慮把pipeline異步提交,不要阻塞當前線程。
    以上都是建議,僅供參考!  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2015-01-26 10:46 黎洪鑫

    多謝了,我先做一下升級看看情況會不會改善。@nieyong
      回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2015-05-28 10:05 tinsang

    @nieyong
    可以考慮把pipeline異步提交,不要阻塞當前線程 ;

    這個異步是指?不是很明白  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2015-05-28 17:04 nieyong

    @tinsang
    把較為耗時任務委派到其它線程處理,當前業務線程繼續忙別的。  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2015-05-28 19:01 tinsang

    @nieyong
    那我明白你的意思了  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2015-05-28 19:02 tinsang

    @nieyong
    pipeline阻塞了,那其他請求redis不是一樣被阻塞了?  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2015-05-29 09:52 nieyong

    @tinsang
    針對單臺Redis而言,單線程模型。一旦pipeline阻塞了,其它請求會被阻塞住。可考慮單線程操作管道,一個一個批處理。  回復  更多評論   

    # re: 為什么批量請求要盡可能的合并操作 2016-05-16 17:45 zhouwei

    private static boolean finished = false;
    finished變量應該為volatile。
    樓主這么牛逼的人不應該犯這種小錯誤 ^_^
    好文章,學習了~  回復  更多評論   

    公告

    所有文章皆為原創,若轉載請標明出處,謝謝~

    新浪微博,歡迎關注:

    導航

    <2014年11月>
    2627282930311
    2345678
    9101112131415
    16171819202122
    23242526272829
    30123456

    統計

    常用鏈接

    留言簿(58)

    隨筆分類(130)

    隨筆檔案(151)

    個人收藏

    最新隨筆

    搜索

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 91亚洲国产成人久久精品网站| 国产啪精品视频网免费| 国产午夜影视大全免费观看| 亚洲一区二区三区成人网站| 99re免费在线视频| 亚洲最新中文字幕| 可以免费看的卡一卡二| 亚洲av一本岛在线播放| 青娱分类视频精品免费2| 亚洲国产精品成人精品小说| 最近免费中文字幕视频高清在线看| 亚洲国产精品xo在线观看| 18禁止观看免费私人影院| 亚洲六月丁香婷婷综合| 在线视频免费观看www动漫| 亚洲精品无码av中文字幕| 麻豆国产入口在线观看免费| 男男gay做爽爽的视频免费| 国产偷国产偷亚洲高清日韩| 你懂的在线免费观看| 亚洲福利视频导航| 国产大片线上免费观看 | 97超高清在线观看免费视频| 亚洲AV无码欧洲AV无码网站| 2021在线永久免费视频| 亚洲综合无码一区二区痴汉 | 亚洲一区二区精品视频| CAOPORN国产精品免费视频| 亚洲欧洲日产韩国在线| 日本人护士免费xxxx视频| a级毛片黄免费a级毛片| 亚洲精品中文字幕无码AV| 免费黄色app网站| 成人精品视频99在线观看免费| 亚洲人成电影在在线观看网色 | 国产精品国产午夜免费福利看| 久久高潮一级毛片免费| 亚洲日本在线播放| 亚洲AV成人潮喷综合网| 未满十八18禁止免费无码网站| 99亚洲乱人伦aⅴ精品|