Encoder 是一種 Handler 類(關于 Handler類,請參看前面的分析文章)。它在 download.py 中被初始化。它與 Connection類一起,完成“BT對等連接”的建立,以及“BT對等協議”的分析。
為了有助于理解,我添加了一些用圓圈括起來的序號,建議你按照這個順序去閱讀。
class Connection:
②def __init__(self, Encoder, connection, id, is_local):
self.encoder = Encoder
self.connection = connection
如果是本地發起連接,那么 id 是對方的 id,否則 id 為 None
self.id = id
如果連接是由本地發起的,那么 is_local 為 True,否則為 False
self.locally_initiated = is_local
self.complete = False
self.closed = False
self.buffer = StringIO()
self.next_len = 1
self.next_func = self.read_header_len
如果連接是由本地主動發起建立的,那么需要向對方發送一個握手消息。(如果不是由本地主動發起的,那么就是被動建立的,那么不能在這里發送握手消息,而必須在分析完對方的握手消息之后,再去回應一個握手西消息,請看read_download_id() 中的處理。
if self.locally_initiated:
connection.write(chr(len(protocol_name)) + protocol_name +
(chr(0) * 8) + self.encoder.download_id)
if self.id is not None:
connection.write(self.encoder.my_id)
def get_ip(self):
return self.connection.get_ip()
def get_id(self):
return self.id
def is_locally_initiated(self):
return self.locally_initiated
def is_flushed(self):
return self.connection.is_flushed()
⑦def read_header_len(self, s):
if ord(s) != len(protocol_name):
return None
return len(protocol_name), self.read_header
def read_header(self, s):
if s != protocol_name:
return None
return 8, self.read_reserved
def read_reserved(self, s):
return 20, self.read_download_id
def read_download_id(self, s):
if s != self.encoder.download_id:
return None
這一步很重要, 如果連接是由對方發起的,那么,給對方發送一個握手消息。為什么不在讀完了 peer id 之后才發送這個消息了?這是因為 peer id 是可選的,所以只要分析完 download id 之后,就要立即發送握手消息。
if not self.locally_initiated:
self.connection.write(chr(len(protocol_name)) +
protocol_name +
(chr(0) * 8) +
self.encoder.download_id + self.encoder.my_id)
return 20, self.read_peer_id
def read_peer_id(self, s):
if not self.id:
如果 peer id 是自己,那么出錯了
if s == self.encoder.my_id:
return None
for v in self.encoder.connections.s():
如果已經跟該 peer 建立了連接了,那么也出錯了
if s and v.id == s:
return None
self.id = s
if self.locally_initiated:
self.connection.write(self.encoder.my_id)
else:
self.encoder.everinc = True
else:
如果 peer id 和 xxx 不符,那么出錯了。
if s != self.id:
return None
“BT對等連接”的握手過程正式宣告完成,此后,雙方就可以通過這個連接互相發送消息了。
self.complete = True
調用Connecter::connection_made(),這個函數的意義,我們到分析 Connecter 類的時候,再記得分析。
self.encoder.connecter.connection_made(self)
下面進入 BT 消息的處理過程。
return 4, self.read_len
def read_len(self, s):
l = toint(s)
if l > self.encoder.max_len:
return None
return l, self.read_message
消息處理,交給了 Connecter::got_message(),所以下一篇我們要分析 Connecter 類。
def read_message(self, s):
if s != '':
self.encoder.connecter.got_message(self, s)
return 4, self.read_len
def read_dead(self, s):
return None
def close(self):
if not self.closed:
self.connection.close()
self.sever()
def sever(self):
self.closed = True
del self.encoder.connections[self.connection]
if self.complete:
self.encoder.connecter.connection_lost(self)
def send_message(self, message):
self.connection.write(tobinary(len(message)) + message)
⑤在 Encoder::data_came_in() 中調用下面這個函數,表示某個連接上有數據可讀。如果有數據可讀,那么我們就按照 BT 對等協議的規范來進行分析。。。
def data_came_in(self, s):
進入協議分析循環。。。
while True:
if self.closed:
return
self.next_len表示按照BT對等協議規范,下一段要分析的數據的長度
self.buffer.tell() 表示緩沖區中剩下數據的長度
那么 i 就表示:為了完成接下來的協議分析,還需要多少數據?
i = self.next_len - self.buffer.tell()
如果 i 大于所讀到的數據的長度,那表示數據還沒有讀夠,無法繼續協議分析,需要等讀到足夠多的數據才能繼續,所以只能退出。
if i > len(s):
self.buffer.write(s)
return
否則表示這次讀到的數據已經足夠完成一步協議分析。
只取滿足這一步協議分析的數據放入 buffer 中(因為 buffer中可能還有上一步協議分析后留下的一些數據,要加在一起),剩下的數據保留在 s 中。
self.buffer.write(s[:i])
s = s[i:]
從 buffer 中取出數據,這些數據就是這一步協議分析所需要的數據。然后把 buffer 清空。
m = self.buffer.get()
self.buffer.reset()
self.buffer.truncate()
next_func 就是用于這一步協議分析的函數。
返回的 x 是一個二元組,包含了下一步協議分析的數據長度和協議分析函數。這樣,就形成了一個協議分析循環。
try:
x = self.next_func(m)
except:
self.next_len, self.next_func = 1, self.read_dead
raise
if x is None:
self.close()
return
從 x 中分解出 next_len和 next_func。
self.next_len, self.next_func = x
⑥那么BT對等協議分析的第一步是什么了?
請看初始化函數:
self.next_len = 1
self.next_func = self.read_header_len
顯然,第一步協議分析是由 read_header_len() 來完成的。
在BT源碼中,有多處采用了這種協議分析的處理方式。
class Encoder:
def __init__(self, connecter, raw_server, my_id, max_len,
schedulefunc, keepalive_delay, download_id,
max_initiate = 40):
self.raw_server = raw_server
self.connecter = connecter
self.my_id = my_id
self.max_len = max_len
self.schedulefunc = schedulefunc
self.keepalive_delay = keepalive_delay
self.download_id = download_id
最大發起的連接數
self.max_initiate = max_initiate
self.everinc = False
self.connections = {}
self.spares = []
schedulefunc(self.send_keepalives, keepalive_delay)
為了保持連接不因為超時而被關閉,所以可能需要隨機的發送一些空消息,它的目的純粹是為了保證連接的“活力”
def send_keepalives(self):
self.schedulefunc(self.send_keepalives, self.keepalive_delay)
for c in self.connections.s():
if c.complete:
c.send_message('')
③主動向對方發起一個連接,這個函數什么時候調用?
請看 download.py 中 Rerequester 類的初始化函數,其中傳遞的一個參數是 encoder.start_connection。
再看 Rerequester.py 中,Rerequester::postrequest() 的最后,
for x in peers:
self.connect((x[0], x[1]), x[2])
這里調用的 connect() 就是初始化的時候傳遞進來的 encoder.start_connection,也就是下面這個函數了。
也就是說,當客戶端從 tracker 服務器那里獲取了 peers 列表之后,就逐一向這些 peers 主動發起連接。
def start_connection(self, dns, id):
if id:
跟自己不用建立連接。
if id == self.my_id:
return
如果已經與對方建立起連接,也不再建立連接
for v in self.connections.s():
if v.id == id:
return
如果當前連接數,已經超過設定的“最大發起連接數”,那么就暫時不建立連接。
if len(self.connections) >= self.max_initiate:
如果空閑連接數還小于 “最大發起連接數”,那么把對方的 ip 先放到spares中,等以后網絡稍微空閑一點的時候,再從 spares 中取出來,實際去建立連接。
if len(self.spares) < self.max_initiate and dns not in self.spares:
self.spares.append(dns)
return
try:
調用 RawServer::start_connection 與對方建立TCP連接
c = self.raw_server.start_connection(dns)
創建 Connection 對象,加入到 connections 字典中,注意,最后一個參數是 True,表示是這個連接是由本地主動發起的。這樣,在 Connection 的初始化函數中,會與對方進行 BT 對等協議的握手。
self.connections[c] = Connection(self, c, id, True)
except socketerror:
pass
這個內部函數好像沒有用到
def _start_connection(self, dns, id):
def foo(self=self, dns=dns, id=id):
self.start_connection(dns, id)
self.schedulefunc(foo, 0)
def got_id(self, connection):
for v in self.connections.s():
if connection is not v and connection.id == v.id:
connection.close()
return
self.connecter.connection_made(connection)
def ever_got_incoming(self):
return self.everinc
①在 RawServer 中,當從外部發起的一個TCP成功建立后,調用此函數。
這里傳遞進來的參數 connection 是 SingleSocket 類型
def external_connection_made(self, connection):
創建一個 Connection 對象,加入到 connections 字典中。
self.connections[connection] = Connection(self, connection, None, False)
def connection_flushed(self, connection):
c = self.connections[connection]
if c.complete:
self.connecter.connection_flushed(c)
關閉連接的時候調用此函數
def connection_lost(self, connection):
self.connections[connection].sever()
關閉一個連接之后,連接數量可能就沒有達到“最大連接數”,所以如果 spares 中有一些等待建立的 ip ,現在可以取出來,主動向對方發起連接。
while len(self.connections) < self.max_initiate and self.spares:
self.start_connection(self.spares.pop(), None)
④某個連接上(無論該連接上主動建立還是被動建立的)有數據可讀的時候,調用此函數。在 RawServer 中被調用。轉而去調 Connection::data_came_in()。
def data_came_in(self, connection, data):
self.connections[connection].data_came_in(data)
posted on 2007-01-19 00:22
苦笑枯 閱讀(353)
評論(0) 編輯 收藏 所屬分類:
P2P