<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 56,  comments - 12,  trackbacks - 0

    概要
     上一節我們分析了BT客戶端主程序的框架:一個循環的服務器程序。在這個循環過程中,客戶端要完成如下任務:
    ? 與 tracker 服? 務器通信;匯報自己的狀態,同? 時獲取其它 peers 的信息;
    ? 接受其它 peers 的連接請求;
    ? 主動向某些 peers 發起連接請求;
    ? 對BT對等協議的分析處理;
    ? 啟用片斷選擇算法,? 通過這些主動或被動的連接去下載所需要的片斷;
    ? 為其它 peers 提供片斷上傳;啟用阻塞算法,? 阻塞某些 peers 的上傳請求;
    ? 將下載的片斷寫入磁盤;
    ? 其它任務;

      這一節我們分析BT客戶端與 tracker 服務器的通信過程。在整個下載過程中,客戶端必須不斷的與 tracker 通信,報告自己的狀態,同時也從 tracker 那里獲取當前參與下載的其它 peers 的信息(ip地址、端口等)。一旦不能與 tracker 通信,則下載過程無法繼續下去。
     客戶端與 tracker 之間采用 HTTP 協議通信,客戶端把狀態信息放在 HTTP協議的GET命令的參數中傳遞給 tracker,tracker 則把 peers列表等信息放在 中傳遞給 客戶端。這種設計非常簡捷巧妙。采用HTTP協議也更使得穿越防火墻的阻攔更容易(不過實際應用中,tracker通常不會使用 80 端口)。
     請務必參考服務器端源碼分析的幾篇相關文章(論壇中有)。
     Rerequester 類全權負責與 tracker 的通信任務,我們首先看客戶端主程序 download.py中相應的代碼:

    【download.py】
    ①rerequest = Rerequester(response['announce'], config['rerequest_interval'],
            rawserver.add_task, connecter.how_many_connections,
            config['min_peers'], encoder.start_connection,
            rawserver.add_task, storagewrapper.get_amount_left,
            upmeasure.get_total, downmeasure.get_total, listen_port,
            config['ip'], myid, infohash, config['http_timeout'], errorfunc,
            config['max_initiate'], doneflag, upmeasure.get_rate,
    downmeasure.get_rate,
            encoder.ever_got_incoming
    ②rerequest.begin()

    Rerequester 類的構造函數中傳遞了一大堆參數,重點是 rawserver.add_task和 encoder.start_connection(rawserver和encoder 這兩個對象,在上一節中,我們已經看到了它們的構造過程),分別對應RawServer::add_task() 和 Encoder::start_connection(),稍后會看到它們的作用。
    一切從 Rerequest::begin() 開始。

    【rerequester.py】

    class Rerequester:
     # 這個構造函數傳遞了一大堆參數,現在沒必要搞的那么清楚,關鍵是 sched 和 connect 這兩個參數,對照前面的代碼,很容易知道它們是什么。
        def __init__(self, url, interval, sched, howmany, minpeers,
                connect, externalsched, amount_left, up, down,
                port, ip, myid, infohash, timeout, errorfunc, maxpeers, doneflag,
                upratefunc, downratefunc, ever_got_incoming):
            self.url = ('%s?info_hash=%s&peer_id=%s&port=%s&key=%s' %
                (url, quote(infohash), quote(myid), str(port),
                b2a_hex(''.join([chr(randrange(256)) for i in xrange(4)]))))
            if ip != '':
                self.url += '&ip=' + quote(ip)
            self.interval = interval
            self.last = None
            self.trackerid = None
            self.announce_interval = 30 * 60
            self.sched = sched # -》RawServer::add_task()
            self.howmany = howmany # 當前有多少個連接 peer
            self.minpeers = minpeers
            self.connect = connect #-》 Encoder::start_connection()
            self.externalsched = externalsched
            self.amount_left = amount_left
            self.up = up
            self.down = down
            self.timeout = timeout
            self.errorfunc = errorfunc
            self.maxpeers = maxpeers
            self.doneflag = doneflag
            self.upratefunc = upratefunc
            self.downratefunc = downratefunc
            self.ever_got_incoming = ever_got_incoming
            self.last_failed = True
            self.last_time = 0

    ④ def c(self):
      # 再調用一次 add_task(),把自身加入到客戶端的任務隊列中,這樣就形成了任務循環,每隔一段時間,就會執行這個函數。從而,客戶端與 tracker 的通信能夠持續的進行下去。
            self.sched(self.c, self.interval)
           
            if self.ever_got_incoming():
                # 如果曾經接受到過外來的連接,那么說明這個客戶端是可以接受外來連接的,也就是說很可能處于“公網”之中,所以并非很迫切的需要從 tracker 那里獲取 peers 列表。(既可以主動連接別人,也可以接受別人的連接,所以有點不緊不慢的味道)。
                getmore = self.howmany() <= self.minpeers / 3
            else:
                # 如果從來沒有接受到過外來的連接,那么很有可能處于內網之中,根本無法接受外來的連接。這種情況下,只有主動的去連接外部的 peers,所以要盡可能的向 tracker 請求更多的 peers。(只能去連接別人,所以更迫切一些)。
                getmore = self.howmany() < self.minpeers
            if getmore or time() - self.last_time > self.announce_interval:
                # 如果有必要從 tracker 那里獲取 peers 列表,且任務執行時間已到,則調用 announce() 開始與 tracker 通信
                self.announce()

    ③    def begin(self):
            # 調用前面提到的 RawServer::add_task(),把 Rerequester::c() 加入到 rawserver對象的任務隊列中
            self.sched(self.c, self.interval)
      # 宣布與 tracker 的通信開始。。。。
            self.announce(0)

    ⑤    def announce(self, event = None):
            self.last_time = time()
      # 傳遞給 tracker 的信息,放在 HTTP GET 命令的參數中。所以,首先就是構造這個參數。
      # 傳遞給 tracker 的信息,必須包括 uploaded、downloaded、left 信息。其它入 last、trackerid、numwant、compact、event 是可選的。這些參數的解釋請看 BT協議規范。
            s = ('%s&uploaded=%s&downloaded=%s&left=%s' %
                (self.url, str(self.up()), str(self.down()),
                str(self.amount_left())))
            if self.last is not None:
                s += '&last=' + quote(str(self.last))
            if self.trackerid is not None:
                s += '&trackerid=' + quote(str(self.trackerid))
            if self.howmany() >= self.maxpeers:
                s += '&numwant=0'
            else:
                s += '&compact=1'
            if event != None:
                s += '&event=' + ['started', 'completed', 'stopped'][event]
            set = SetOnce().set

      # 函數中可以嵌套定義函數,這是 python 語法特別的地方。
      # 調用 RawServer::add_task() 把 checkfail() 加入到任務隊列中。在 timeout 時間之后,檢查向 tracker 發的GET命令是否失敗。
            def checkfail(self = self, set = set):
                if set():
                    if self.last_failed and self.upratefunc() < 100 and self.downratefunc() < 100:
                        self.errorfunc('Problem connecting to tracker - timeout exceeded')
                    self.last_failed = True
            self.sched(checkfail, self.timeout)

            # 創建一個線程,執行 Rerequester::rerequest()
      # 可見,客戶端與 tracker之間的通信是由單獨的線程完成的
            Thread(target = self.rerequest, args = [s, set]).start()

    ⑥    def rerequest(self, url, set):
            try:
       # 調用 urlopen() ,向 tracker 發送命令
    h = urlopen(url)
    # read() 返回 tracker 的響應數據。
                r = h.read()
                h.close()
                if set():
                    def add(self = self, r = r):
    self.last_failed = False
    # 從 tracker 響應回來的數據,由 postrequest() 處理。
    self.postrequest(r)
    # externalsched() 同樣是調用 RawServer::add_task(),這樣,由主線程調用 postrequest() 函數,處理 tracker 響應的數據。(不要忘記,我們現在是處在一個單獨的子線程中。)
                    self.externalsched(add, 0)
            except (IOError, error), e:
                if set():
                    def fail(self = self, r = 'Problem connecting to tracker - ' + str(e)):
                        if self.last_failed:
                            self.errorfunc(r)
                        self.last_failed = True
                    self.externalsched(fail, 0)

    ⑦    def postrequest(self, data):
            try:
       # 對服務器響應的數據解碼。請參看BT協議規范。
    r = bdecode(data)
    # 檢查 peers 有效性?
                check_peers(r)
                if r.has_key('failure reason'):
                    self.errorfunc('rejected by tracker - ' + r['failure reason'])
                else:
                    if r.has_key('warning message'):
    self.errorfunc('warning from tracker - ' + r['warning message'])

     # 根據 tracker 的響應,調整BT客戶端的參數。
                    self.announce_interval = r.get('interval', self.announce_interval)
                    self.interval = r.get('min interval', self.interval)
                    self.trackerid = r.get('tracker id', self.trackerid)
                    self.last = r.get('last')

        # 將其它下載者的信息保存到 peers 數組中。
                    p = r['peers']
                    peers = []
                    if type(p) == type(''):
                        for x in xrange(0, len(p), 6):
                            ip = '.'.join([str(ord(i)) for i in p[x:x+4]])
                            port = (ord(p[x+4]) << 8) | ord(p[x+5])
                            peers.append((ip, port, None))
                    else:
                        for x in p:
                            peers.append((x['ip'], x['port'], x.get('peer id')))
                    ps = len(peers) + self.howmany()
                    if ps < self.maxpeers:
                        if self.doneflag.isSet():
                            if r.get('num peers', 1000) - r.get('done peers', 0) > ps * 1.2:
                                self.last = None
                        else:
                            if r.get('num peers', 1000) > ps * 1.2:
                                self.last = None
                    # 這里的 connect,即前面的Encoder::start_connection(),
        # 至此,已經成功的完成了一次與 tracker 服務器的通信,并且從它那里獲取了 peers 列表;下面就與這些 peers 挨個建立連接;至于建立連接的過程,就要追蹤到 Encoder 類中了,且聽下回分解。
                    for x in peers:
                        # x[0]:ip, x[1]:port, x[2]:id
                        self.connect((x[0], x[1]), x[2])
            except Error, e:
                if data != '':
                    self.errorfunc('bad data from tracker - ' + str(e))


    小結:
      這篇文章,我們重點分析了BT客戶端與 tracker 服務器通信的過程。我們知道了,客戶端要每隔一段時間,就去連接 tracker 一次,它是以一個單獨的線程執行的。這個線程在接受到 tracker 的響應數據后,交給主線程(既主程序)來進行分析。主程序從響應數據中,獲得 peers 列表。然后調用 Encoder::start_connection() 挨個與這些 peers 嘗試建立連接。

    posted on 2007-01-19 00:23 苦笑枯 閱讀(529) 評論(0)  編輯  收藏 所屬分類: P2P
    收藏來自互聯網,僅供學習。若有侵權,請與我聯系!

    <2007年1月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿(2)

    隨筆分類(56)

    隨筆檔案(56)

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 免费的一级片网站| 成人无码区免费视频观看| 亚洲综合国产精品| 日韩免费高清视频| 亚洲视频免费在线播放| eeuss草民免费| 看Aⅴ免费毛片手机播放| 亚洲综合色一区二区三区小说| 亚洲人成色7777在线观看不卡| 2019中文字幕免费电影在线播放 | 国产福利电影一区二区三区,免费久久久久久久精 | 亚洲色成人四虎在线观看| 中文亚洲AV片不卡在线观看| 大学生高清一级毛片免费| 99re在线精品视频免费| 久久中文字幕免费视频| 日日摸日日碰夜夜爽亚洲| 亚洲三级视频在线观看| 亚洲人成网址在线观看| 国产AV无码专区亚洲AV男同 | 粉色视频免费入口| 国产青草亚洲香蕉精品久久 | 成人免费看吃奶视频网站| 在线观看免费人成视频色| 无码人妻一区二区三区免费| 97热久久免费频精品99| 啦啦啦在线免费视频| 国产成人aaa在线视频免费观看| 青青青国产免费一夜七次郎| 又粗又黄又猛又爽大片免费 | 午夜精品一区二区三区免费视频| 久久久久久AV无码免费网站下载| 5g影院5g天天爽永久免费影院| 在免费jizzjizz在线播| 国产91久久久久久久免费| 在线亚洲午夜理论AV大片| 久久亚洲国产精品成人AV秋霞| 国产精品亚洲片在线va| www成人免费视频| 91精品免费国产高清在线| 亚洲区日韩区无码区|