概要
上一節我們分析了BT客戶端主程序的框架:一個循環的服務器程序。在這個循環過程中,客戶端要完成如下任務:
? 與 tracker 服? 務器通信;匯報自己的狀態,同? 時獲取其它 peers 的信息;
? 接受其它 peers 的連接請求;
? 主動向某些 peers 發起連接請求;
? 對BT對等協議的分析處理;
? 啟用片斷選擇算法,? 通過這些主動或被動的連接去下載所需要的片斷;
? 為其它 peers 提供片斷上傳;啟用阻塞算法,? 阻塞某些 peers 的上傳請求;
? 將下載的片斷寫入磁盤;
? 其它任務;
這一節我們分析BT客戶端與 tracker 服務器的通信過程。在整個下載過程中,客戶端必須不斷的與 tracker
通信,報告自己的狀態,同時也從 tracker 那里獲取當前參與下載的其它 peers 的信息(ip地址、端口等)。一旦不能與 tracker
通信,則下載過程無法繼續下去。
客戶端與 tracker 之間采用 HTTP 協議通信,客戶端把狀態信息放在
HTTP協議的GET命令的參數中傳遞給 tracker,tracker 則把 peers列表等信息放在 中傳遞給
客戶端。這種設計非常簡捷巧妙。采用HTTP協議也更使得穿越防火墻的阻攔更容易(不過實際應用中,tracker通常不會使用 80 端口)。
請務必參考服務器端源碼分析的幾篇相關文章(論壇中有)。
Rerequester 類全權負責與 tracker 的通信任務,我們首先看客戶端主程序 download.py中相應的代碼:
【download.py】
①rerequest = Rerequester(response['announce'], config['rerequest_interval'],
rawserver.add_task, connecter.how_many_connections,
config['min_peers'], encoder.start_connection,
rawserver.add_task, storagewrapper.get_amount_left,
upmeasure.get_total, downmeasure.get_total, listen_port,
config['ip'], myid, infohash, config['http_timeout'], errorfunc,
config['max_initiate'], doneflag, upmeasure.get_rate,
downmeasure.get_rate,
encoder.ever_got_incoming
②rerequest.begin()
Rerequester
類的構造函數中傳遞了一大堆參數,重點是 rawserver.add_task和
encoder.start_connection(rawserver和encoder
這兩個對象,在上一節中,我們已經看到了它們的構造過程),分別對應RawServer::add_task() 和
Encoder::start_connection(),稍后會看到它們的作用。
一切從 Rerequest::begin() 開始。
【rerequester.py】
class Rerequester:
# 這個構造函數傳遞了一大堆參數,現在沒必要搞的那么清楚,關鍵是 sched 和 connect 這兩個參數,對照前面的代碼,很容易知道它們是什么。
def __init__(self, url, interval, sched, howmany, minpeers,
connect, externalsched, amount_left, up, down,
port, ip, myid, infohash, timeout, errorfunc, maxpeers, doneflag,
upratefunc, downratefunc, ever_got_incoming):
self.url = ('%s?info_hash=%s&peer_id=%s&port=%s&key=%s' %
(url, quote(infohash), quote(myid), str(port),
b2a_hex(''.join([chr(randrange(256)) for i in xrange(4)]))))
if ip != '':
self.url += '&ip=' + quote(ip)
self.interval = interval
self.last = None
self.trackerid = None
self.announce_interval = 30 * 60
self.sched = sched # -》RawServer::add_task()
self.howmany = howmany # 當前有多少個連接 peer
self.minpeers = minpeers
self.connect = connect #-》 Encoder::start_connection()
self.externalsched = externalsched
self.amount_left = amount_left
self.up = up
self.down = down
self.timeout = timeout
self.errorfunc = errorfunc
self.maxpeers = maxpeers
self.doneflag = doneflag
self.upratefunc = upratefunc
self.downratefunc = downratefunc
self.ever_got_incoming = ever_got_incoming
self.last_failed = True
self.last_time = 0
④ def c(self):
# 再調用一次 add_task(),把自身加入到客戶端的任務隊列中,這樣就形成了任務循環,每隔一段時間,就會執行這個函數。從而,客戶端與 tracker 的通信能夠持續的進行下去。
self.sched(self.c, self.interval)
if self.ever_got_incoming():
# 如果曾經接受到過外來的連接,那么說明這個客戶端是可以接受外來連接的,也就是說很可能處于“公網”之中,所以并非很迫切的需要從 tracker
那里獲取 peers 列表。(既可以主動連接別人,也可以接受別人的連接,所以有點不緊不慢的味道)。
getmore = self.howmany() <= self.minpeers / 3
else:
# 如果從來沒有接受到過外來的連接,那么很有可能處于內網之中,根本無法接受外來的連接。這種情況下,只有主動的去連接外部的 peers,所以要盡可能的向 tracker 請求更多的 peers。(只能去連接別人,所以更迫切一些)。
getmore = self.howmany() < self.minpeers
if getmore or time() - self.last_time > self.announce_interval:
# 如果有必要從 tracker 那里獲取 peers 列表,且任務執行時間已到,則調用 announce() 開始與 tracker 通信
self.announce()
③ def begin(self):
# 調用前面提到的 RawServer::add_task(),把 Rerequester::c() 加入到 rawserver對象的任務隊列中
self.sched(self.c, self.interval)
# 宣布與 tracker 的通信開始。。。。
self.announce(0)
⑤ def announce(self, event = None):
self.last_time = time()
# 傳遞給 tracker 的信息,放在 HTTP GET 命令的參數中。所以,首先就是構造這個參數。
# 傳遞給 tracker 的信息,必須包括 uploaded、downloaded、left 信息。其它入 last、trackerid、numwant、compact、event 是可選的。這些參數的解釋請看 BT協議規范。
s = ('%s&uploaded=%s&downloaded=%s&left=%s' %
(self.url, str(self.up()), str(self.down()),
str(self.amount_left())))
if self.last is not None:
s += '&last=' + quote(str(self.last))
if self.trackerid is not None:
s += '&trackerid=' + quote(str(self.trackerid))
if self.howmany() >= self.maxpeers:
s += '&numwant=0'
else:
s += '&compact=1'
if event != None:
s += '&event=' + ['started', 'completed', 'stopped'][event]
set = SetOnce().set
# 函數中可以嵌套定義函數,這是 python 語法特別的地方。
# 調用 RawServer::add_task() 把 checkfail() 加入到任務隊列中。在 timeout 時間之后,檢查向 tracker 發的GET命令是否失敗。
def checkfail(self = self, set = set):
if set():
if self.last_failed and self.upratefunc() < 100 and self.downratefunc() < 100:
self.errorfunc('Problem connecting to tracker - timeout exceeded')
self.last_failed = True
self.sched(checkfail, self.timeout)
# 創建一個線程,執行 Rerequester::rerequest()
# 可見,客戶端與 tracker之間的通信是由單獨的線程完成的
Thread(target = self.rerequest, args = [s, set]).start()
⑥ def rerequest(self, url, set):
try:
# 調用 urlopen() ,向 tracker 發送命令
h = urlopen(url)
# read() 返回 tracker 的響應數據。
r = h.read()
h.close()
if set():
def add(self = self, r = r):
self.last_failed = False
# 從 tracker 響應回來的數據,由 postrequest() 處理。
self.postrequest(r)
# externalsched() 同樣是調用 RawServer::add_task(),這樣,由主線程調用 postrequest() 函數,處理 tracker 響應的數據。(不要忘記,我們現在是處在一個單獨的子線程中。)
self.externalsched(add, 0)
except (IOError, error), e:
if set():
def fail(self = self, r = 'Problem connecting to tracker - ' + str(e)):
if self.last_failed:
self.errorfunc(r)
self.last_failed = True
self.externalsched(fail, 0)
⑦ def postrequest(self, data):
try:
# 對服務器響應的數據解碼。請參看BT協議規范。
r = bdecode(data)
# 檢查 peers 有效性?
check_peers(r)
if r.has_key('failure reason'):
self.errorfunc('rejected by tracker - ' + r['failure reason'])
else:
if r.has_key('warning message'):
self.errorfunc('warning from tracker - ' + r['warning message'])
# 根據 tracker 的響應,調整BT客戶端的參數。
self.announce_interval = r.get('interval', self.announce_interval)
self.interval = r.get('min interval', self.interval)
self.trackerid = r.get('tracker id', self.trackerid)
self.last = r.get('last')
# 將其它下載者的信息保存到 peers 數組中。
p = r['peers']
peers = []
if type(p) == type(''):
for x in xrange(0, len(p), 6):
ip = '.'.join([str(ord(i)) for i in p[x:x+4]])
port = (ord(p[x+4]) << 8) | ord(p[x+5])
peers.append((ip, port, None))
else:
for x in p:
peers.append((x['ip'], x['port'], x.get('peer id')))
ps = len(peers) + self.howmany()
if ps < self.maxpeers:
if self.doneflag.isSet():
if r.get('num peers', 1000) - r.get('done peers', 0) > ps * 1.2:
self.last = None
else:
if r.get('num peers', 1000) > ps * 1.2:
self.last = None
# 這里的 connect,即前面的Encoder::start_connection(),
# 至此,已經成功的完成了一次與 tracker 服務器的通信,并且從它那里獲取了 peers 列表;下面就與這些 peers 挨個建立連接;至于建立連接的過程,就要追蹤到 Encoder 類中了,且聽下回分解。
for x in peers:
# x[0]:ip, x[1]:port, x[2]:id
self.connect((x[0], x[1]), x[2])
except Error, e:
if data != '':
self.errorfunc('bad data from tracker - ' + str(e))
小結:
這篇文章,我們重點分析了BT客戶端與 tracker 服務器通信的過程。我們知道了,客戶端要每隔一段時間,就去連接 tracker
一次,它是以一個單獨的線程執行的。這個線程在接受到 tracker 的響應數據后,交給主線程(既主程序)來進行分析。主程序從響應數據中,獲得
peers 列表。然后調用 Encoder::start_connection() 挨個與這些 peers 嘗試建立連接。
posted on 2007-01-19 00:23
苦笑枯 閱讀(529)
評論(0) 編輯 收藏 所屬分類:
P2P