<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 167,  comments - 30,  trackbacks - 0
    #!/usr/bin/env python
    #
     -*- coding: utf-8 -*-
    #
    ===============================================================================
    #
    #
     Copyright (c) 2015 Letv.com, Inc. All Rights Reserved
    #
    #
     python2.6版本安裝:
    #
               yum install python-futures
    #
               yum install python-qpid
    #
               yum install supervisor  由supervisor來管理進程,當進程掛掉后自動監控并重啟
    #
     author: david
    #
     date: 2015/10/14 16:42:02
    #
    #
    ===============================================================================
    from qpid.messaging import *
    import time, sys
    import urllib2
    import hashlib
    import json
    import traceback
    from concurrent.futures import *
    import logging
    import logging.handlers

    LOG_DIR = "/home/ldw/logs/geturl/online"
    LOG_MID_FAIL=LOG_DIR + "/update_fail.log"
    LOG_FILE_NAME=LOG_DIR + "/geturl_update.log"

    # default test broker
    QPID_QUEUE_NAME_TEST = "ldw.update_v.queue"
    SERVERS_TEST = ['10.3.3.3']
    SERVERS_PRO = ['10.1.1.1.''10.2.2.2']
    BROKER_TEST = "xyz/xyz@10.11.1.1:5672"
    BROKER_PRO = "xyz/xyz@10.11.1.2:5672"
    RECONNECTION_URLS = ['']
    SERVERS = SERVERS_TEST
    BROKER = BROKER_TEST
    QUEUE_NAME = QPID_QUEUE_NAME_TEST
    THREAD_COUNT = 16

    # 獲取當前時間
    def getNowTime():
        ISOTIMEFORMAT='%Y-%m-%d %H:%M:%S'
        return time.strftime(ISOTIMEFORMAT, time.localtime())

    # 記錄處理失敗日志
    def fail2log(id):
        # 使用with關鍵字,自動關閉文件流
        with  open(LOG_MID_FAIL,'a') as log:
            log.write('%s\n'%(id))

    # 記錄日志, 可以提出來共用
    def init_logger():
        logging.basicConfig()
        logger = logging.getLogger("__name__")
        logger.setLevel(logging.INFO)
        when:S:Seconds M:Minutes D:Days H:Hours interval:  backupCount:0 not deleted
        # 1天更換一次文件日志,7為保留日志文件個數
        logger_fh = logging.handlers.TimedRotatingFileHandler(LOG_FILE_NAME, 'D', 1, 7)
        logger_fh.suffix = "%Y%m%d-%H%M.log"
        logger_fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
        logger.addHandler(logger_fh)
        return logger
    _logger = init_logger()

    # 初始化環境變量,當在局部函數中修改全局變量時,使用global關鍵字
    def init_env(env):
        global BROKER
        # 其他變量類似處理
        if env != 'PRO':
            BROKER = BROKER_PRO
        _logger.info('env:%s,broker:%s,queue_name:%s,reconnnection_urls:%s, servers: %s, thread_pool_count:%s'
                     % (env,BROKER,QUEUE_NAME,str(RECONNECTION_URLS),str(SERVERS), str(THREAD_COUNT)))

    # 處理業務邏輯
    def process(id, ip):
        key = 'xyz';
        action = 'delete';
        tm = str(int(time.time()))
        sig = hashlib.md5(tm+action+key).hexdigest()
        url = "http://ip/test?sig="+sig
        try:
            req = urllib2.Request(url)
            # 綁定Host
            # req.add_header('Host', 'www.ldw.com')
            response = urllib2.urlopen(req, timeout = 1)
            result = response.read()
            if result.find("400")>=0:
                fail2log(id)
                _logger.error('flush cache fail, url :%s' % (url))
            else:
                _logger.info('flush cache OK, IP :%s' % (ip))
        except Exception:
            fail2log(id)
            _logger.error('flush cbase cache error, url: %s' % (url))

    # init qpid connection
    def qpid_get_conn():
        conn = Connection(BROKER, heartbeat=6, reconnect=True, reconnect_limit=60, reconnect_interval=4, reconnect_urls=RECONNECTION_URLS)
        conn.open()
        sess = conn.session()
        rec = sess.receiver(QUEUE_NAME)
        return conn, sess, rec

    # receive message of qpid queue, multithread implements
    def qpid_receive(rec):
        try:
            epool = ThreadPoolExecutor(max_workers=THREAD_COUNT)
            while True:
                try:
                    message = rec.fetch()
                    data = json.loads(message.content)
                    _logger.info(' receive message:%s' % (message.content))
                    mid = data['mid']
                    
                    for ip in SERVERS:
                        try:
                            epool.submit(process(mid, ip))
                        except Exception,e:
                            pass
                            fail2log(mid)
                            exstr = traceback.format_exc()
                            _logger.error(' thread pool process message error:%s' % (exstr))
                except Exception, e:
                    pass
                    fail2log(mid)
                    _logger.error(' fetch message error, msg:%s' % (message.content))
                sess.acknowledge() # message ack
        except Exception, e:
            pass
            exstr = traceback.format_exc()
            _logger.error(' start receive message error:%s' % (exstr))

    if __name__ ==   '__main__':
        if(len(sys.argv) < 2):
            print "Usage: \"python input args error\""
            sys.exit()
        env = sys.argv[1]
        if env != 'TEST' and env != 'PRO':
            print "Usage: \"input evn args error\""
            sys.exit()

        try:
            init_env(env)
            conn, sess, rec =  qpid_get_conn()
            qpid_receive(rec)
            for c in [ conn, sess, rec]:
                 if c: c.close()
        except Exception, e:
            pass
            exstr = traceback.format_exc()
            _logger.error('init qpid connection message error:%s' % (exstr))
    posted on 2015-10-30 20:20 David1228 閱讀(2879) 評論(0)  編輯  收藏 所屬分類: 動態語言Python

    <2015年10月>
    27282930123
    45678910
    11121314151617
    18192021222324
    25262728293031
    1234567

    常用鏈接

    留言簿(4)

    隨筆分類

    隨筆檔案

    文章檔案

    新聞分類

    新聞檔案

    相冊

    收藏夾

    Java

    Linux知識相關

    Spring相關

    云計算/Linux/虛擬化技術/

    友情博客

    多線程并發編程

    開源技術

    持久層技術相關

    搜索

    •  

    積分與排名

    • 積分 - 358547
    • 排名 - 154

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 一道本在线免费视频| xxxxx免费视频| 中文字幕亚洲综合久久| 日本妇人成熟免费中文字幕| 野花视频在线官网免费1| 国产亚洲精AA在线观看SEE| 丁香花免费高清视频完整版| 欧洲乱码伦视频免费国产| 色拍自拍亚洲综合图区| 国产精品二区三区免费播放心| 国产成人无码免费看片软件| 亚洲一本之道高清乱码| 精品国产亚洲一区二区在线观看 | 国产成人无码综合亚洲日韩| 综合在线免费视频| 国产精品小视频免费无限app | 亚洲中文字幕乱码熟女在线| 亚洲精品无码久久久久去q | 99久在线国内在线播放免费观看| 亚洲精品第一国产综合亚AV| 亚洲AV无码国产精品麻豆天美| 免费国内精品久久久久影院| 亚洲视频免费播放| 成年女人A毛片免费视频| 亚洲综合一区无码精品| 亚洲国产日韩在线视频| 免费大片黄手机在线观看| 国产成人精品免费视频网页大全| 国产成人无码免费网站| 亚洲精品无码一区二区| 亚洲男女性高爱潮网站| 亚洲女久久久噜噜噜熟女| 国产美女无遮挡免费网站| 67pao强力打造高清免费| 日韩电影免费在线观看网站| 菠萝菠萝蜜在线免费视频| 亚洲国产综合精品中文第一| 亚洲精品一区二区三区四区乱码 | 亚洲日韩精品A∨片无码加勒比| 亚洲国产成人久久精品影视| 国产精品xxxx国产喷水亚洲国产精品无码久久一区 |