StorageWrapper
的作用:把文件片斷進一步切割為子片斷,并且為這些子片斷發送
request
消息。在獲得子片斷后,將數據寫入磁盤。
請結合
Storage
類的分析來看。
幾點說明:
1、
為了獲取傳輸性能,
BT
把文件片斷切割為多個子片斷。
2、
BT
為獲取一個子片斷,需要向擁有該子片斷的
peer
發送
request
消息(關于
request
消息,參見《
BT
協議規范》)。
3、
例如一個
256k
大小的片斷,索引號是
10
,被劃分為
16
個
16k
大小的子片斷。那么需要為這
16
個子片斷分別產生一個
request
消息。這些
request
消息在發出之前,以
list
的形式保存在
inactive_requests
這個
list
中。例如對這個片斷,就保存在
inactive_requests
下標為
10
(片斷的索引號)的地方,值是如下的
list
:
[(0,16k),(16k,
16k), (32k, 16k), (48k, 16k), (64k, 16k), (80k, 16k), (96k, 16k),
(112k, 16k), (128k, 16k), (144k, 16k), (160k, 16k), (176k, 16k), (192k,
16k), (208k, 16k), (224k, 16k), (240k, 16k)]
。這個處理過程在
_make_inactive()
函數中。因為這些
request
還沒有發送出去,所以叫做
inactive request
(未激活的請求)。如果一個
request
發送出去了,那么叫做
active request
。為每個片斷已經發送出去的
request
個數記錄在
numactive
中。如果收到一個子片斷,那么
active request
個數就要減
1
。
amount_inactive
記錄了尚沒有發出
request
的子片斷總的大小。
4、
每當獲得一個子片段,都要寫入磁盤。如果子片斷所屬的片斷在磁盤上還沒有分配空間,那么首先需要為整個片斷分配空間。如何為片斷分配空間?這正是
StorageWrapper
類中最難理解的一部分代碼。這個“空間分配算法”說起來很簡單,但是在沒有任何注釋的情況下去看代碼,耗費了我好幾天的時間。具體的算法分析,請看
_piece_came_in()
的注釋。
class
StorageWrapper:
def
__init__(self, storage, request_size, hashes,
piece_size, finished, failed,
statusfunc = dummy_status, flag = Event(), check_hashes = True,
data_flunked = dummy_data_flunked):
self.storage = storage # Storage
對象
self.request_size = request_size #
子片斷大小
self.hashes = hashes #
文件片斷摘要信息
self.piece_size = piece_size #
片斷大小
self.data_flunked = data_flunked #
一個函數,用來檢查片斷的完整性
self.total_length = storage.get_total_length() #
文件總大小
self.amount_left = self.total_length #
未下載完的文件大小
#
文件總大小的有效性檢查
#
因為最后一個片斷長度可能小于
piece_size
if self.total_length <= piece_size * (len(hashes) - 1):
raise Error, 'bad data from tracker - total too small'
if self.total_length > piece_size * len(hashes):
raise Error, 'bad data from tracker - total too big'
#
兩個事件,分布在下載完成和下載失敗的時候設置
self.finished = finished
self.failed = failed
這幾個變量的作用在前面已經介紹過了。
self.numactive = [0] * len(hashes)
inactive_request
inactive_requests
的值全部被初始化為
1
,這表示每個片斷都需要發送
request
。后面在對磁盤文件檢查之后,那些已經獲得的片斷,在
inactive_requests
中對應的是
None
,表示不需要再為這些片斷發送
request
了。
self.inactive_requests = [1] * len(hashes)
self.amount_inactive = self.total_length
#
是否進入
EndGame
模式?關于
endgame
模式,在《
Incentives Build Robustness in BitTorrent
》的“片斷選擇算法”中有介紹。后面可以看到,在為最后一個“子片斷”產生請求后,進入
endgame
模式。
self.endgame = False
self.have = Bitfield(len(hashes))
#
該片是否檢查了完整性
self.waschecked = [check_hashes] * len(hashes)
這兩個變量用于“空間分配算法”
self.places = { }
self.holes = [ ]
if len(hashes) == 0:
finished()
return
targets = {}
total = len(hashes)
#
檢查每一個片斷,,,
for i in xrange(len(hashes)):
#
如果磁盤上,還沒有完全為這個片斷分配空間,那么這個片斷需要被下載,在
targets
字典中添加一項(如果已經存在,就不用添加了),它的關鍵字(
key
)是該片斷的摘要值,它的值(
)是一個列表,
這個片斷的索引號被添加到這個列表中。
這里一度讓我非常迷惑,因為一直以為不同的文件片斷肯定具有不同的摘要值。后來才想明白了,那就是:兩個不同的文件片斷,可能擁有相同的摘要值。不是么?只要這兩個片斷的內容是一樣的。
這一點,對后面的分析非常重要。
if not
self._waspre(i):
targets.setdefault(hashes[i], []).append(i)
total -= 1
numchecked = 0.0
if total and check_hashes:
statusfunc({"activity" : 'checking existing file', "fractionDone" : 0})
#
這是一個內嵌在函數中的函數。在
c++
中,可以有內部類,不過好像沒有內部函數的說法。這個函數只能在
__init__()
內部使用。
這個函數在一個片段被確認獲得后調用
# piece:
片斷的索引號
# pos:
這個片斷在磁盤上存儲的位置
例如,片斷
5
可能存儲在片斷
2
的位置上。請參看后面的“空間分配算法”
def
markgot(piece, pos, self = self, check_hashes = check_hashes):
self.places[piece] = pos
self.have[piece] = True
self.amount_left -= self._piecelen(piece)
self.amount_inactive -= self._piecelen(piece)
不用再為這個片斷發送
request
消息了
self.inactive_requests[piece] = None
self.waschecked[piece] = check_hashes
lastlen = self._piecelen(len(hashes) - 1) #
最后一個片斷的長度
#
對每一個片斷
for i in xrange(len(hashes)):
#
如果磁盤上,還沒有完全為這個片斷分配空間,那么在
holes
中添加該片斷的索引號。
if not
self._waspre(i):
self.holes.append(i)
#
否則,也就是空間已經分配。但是還是不能保證這個片斷已經完全獲得了,正如分析
Storage
時提到的那樣,可能存在“空洞”
#
如果不需要進行有效性檢查,那么簡單調用
markgot()
表示已經獲得了該片斷。這顯然是一種不負責任的做法。
elif not check_hashes:
markgot(i, i)
#
如果需要進行有效性檢查
else:
sha
是
python
內置的模塊,它封裝了
SHA-1
摘要算法。
SHA-1
摘要算法對一段任意長的數據進行計算,得出一個
160bit
(也就是
20
個字節)長的消息摘要。在
torrent
文件中,保存了每個片斷的消息摘要。接收方在收到一個文件片斷之后,再計算一次消息摘要,然后跟
torrent
文件中對應的值進行比較,如果結果不一致,那么說明數據在傳輸過程中發生了變化,這樣的數據應該被丟棄。
這里,首先,根據片斷
i
的起始位置開始,
lastlen
長的一段數據構造一個
sha
對象。
sh = sha(self.storage.read(piece_size * i, lastlen))
計算這段數據的消息摘要
sp = sh.digest()
然后,更新
sh
這個
sha
對象,注意,是根據片斷
i
剩下的數據來更新的。關于
sha::update()
的功能,請看
python
的幫助。如果有兩段數據
a
和
b
,那么
sh = sha(a)
sh.update(b)
,等效于
sh = sha(a+b)
所以,下面這個表達式等于
sh.update(self.storage.read(piece_size*i, self._piecelen(i)))
sh.update(self.storage.read(piece_size * i + lastlen, self._piecelen(i) - lastlen))
所以,這次計算出來的就是片斷
i
的摘要
(原來的困惑:為什么不直接計算
i
的摘要,要這么繞一下了?后來分析清楚“空間分配算法”之后,這后面一段代碼也就沒有什么問題了。)
s = sh.digest()
如果計算出來的摘要和
hashes[i]
一致(后者是從
torrent
文件中獲得的),那么,這個片斷有效且已經存在于磁盤上。
if s == hashes[i]:
markgot(i, i)
elif targets.get(s)
and
self._piecelen(i) == self._piecelen(targets[s][-1]):
markgot(targets[s].pop(), i)
elif not self.have[len(hashes) - 1]
and
sp == hashes[-1]
and
(i == len(hashes) - 1 or not self._waspre(len(hashes) - 1)):
markgot(len(hashes) - 1, i)
else:
self.places[i] = i
if flag.isSet():
return
numchecked += 1
statusfunc({'fractionDone': 1 - float(self.amount_left) / self.total_length})
#
如果所有片斷都下載完了,那么結束。
if self.amount_left == 0:
finished()
#
檢查某個片斷,是否已經在磁盤上分配了空間,調用的是
Storage:: was_preallocated()
def
_waspre(self, piece):
return self.storage.was_preallocated(piece * self.piece_size,
self._piecelen(piece))
#
獲取指定片斷的長度,只有最后一個片斷大小可能小于
piece_size
def
_piecelen(self, piece):
if piece < len(self.hashes) - 1:
return self.piece_size
else:
return
self.total_length - piece * self.piece_size
#
返回剩余文件的大小
def
get_amount_left(self):
return self.amount_left
#
判斷是否已經獲得了一些文件片斷
def
do_I_have_anything(self):
return self.amount_left < self.total_length
#
將指定片斷切割為“子片斷”
def
_make_inactive(self, index):
#
先獲取該片斷的長度
length = min(self.piece_size, self.total_length - self.piece_size * index)
l = []
x = 0
#
為了獲得更好的傳輸性能,
BT
把每個文件片斷又分為更小的“子片斷”,我們可以在
download.py
文件中
default
變量中,找到“子片斷”大小的定義:
'download_slice_size', 2 ** 14, "How many bytes to query for per request."
這里定義的“子片斷”大小是
16k
。
下面這個循環,就是將一個片斷進一步切割為“子片斷”的過程。
while x + self.request_size < length:
l.append((x, self.request_size))
x += self.request_size
l.append((x, length - x))
#
將
l
保存到
inactive_requests
這個列表中
self.inactive_requests[index] = l
#
是否處于
endgame
模式,關于
endgame
模式,參加《
Incentives Build Robustness in BitTorrent
》
def
is_endgame(self):
return self.endgame
def
get_have_list(self):
return self.have.tostring()
def
do_I_have(self, index):
return self.have[index]
#
判斷指定的片斷,是否還有
request
沒有發出?如果有,那么返回
true
,否則返回
false
。
def
do_I_have_requests(self, index):
return not not self.inactive_requests[index]
為指定片斷創建一個
request
消息,返回的是一個二元組,例如(
32k, 16k
),表示“子片斷”的起始位置是
32k
,大小是
16k
。
def
new_request(self, index):
# returns (begin, length)
#
如果還沒有為該片斷創建
request
。,那么調用
_make_inactive()
創建
request
列表。(
inactive_requests[index]
初始化的值是
1
)
if self.inactive_requests[index] == 1:
self._make_inactive(index)
# numactive[index]
記錄了已經為該片斷發出了多少個
request
。
self.numactive[index] += 1
rs = self.inactive_requests[index]
#
從
inactive_request
中移出最小的那個
request
(也就是起始位置最小)。
r = min(rs)
rs.remove(r)
# amount_inactive
記錄了尚沒有發出
request
的子片斷總的大小。
self.amount_inactive -= r[1]
#
如果這是最后一個“子片斷”,那么進入
endgame
模式
if self.amount_inactive == 0:
self.endgame = T.rue
#
返回這個
request
return r
def
piece_came_in(self, index, begin, piece):
try:
return self._piece_came_in(index, begin, piece)
except IOError, e:
self.failed('IO Error ' + str(e))
return True
如果獲得了某個“子片斷”,那么調用這個函數。
index
:“子片斷”所在片斷的索引號,
begin
:“子片斷”在片斷中的起始位置,
piece
:實際數據
def
_piece_came_in(self, index, begin, piece):
#
如果之前沒有獲得過該片斷中任何“子片斷”,那么首先需要在磁盤上為整個片斷分配空間。
空間分配的算法如下:
假設一共是
6
個片斷,現在已經為
0
、
1
、
4
三個片斷分配了空間,那么
holes
:
[2, 3, 5]
places
:
{0:0, 1:1, 4:4}
現在要為片斷
5
分配空間,思路是把片斷
5
的空間暫時先分配在片斷
2
應該在的空間上。這樣分配以后,
holes
:
[3, 5]
places: {0:0, 1:1, 4:4, 5:2}
假設下一步為片斷
2
分配空間,因為
2
的空間已經被
5
占用,所以把
5
的數據轉移到
3
上,
2
才可以使用自己的空間。這樣分配之后,
holes
:
[5]
places
:
{0:0, 1:1, 2:2, 4:4, 5:3}
最后,為
3
分配空間,因為
3
的空間被
5
占用,所以把
5
的數據轉移到
5
自己的空間上,
3
就可以使用自己的空間了。這樣分配之后,
holes
:
[]
places
:
{0:0, 1:1, 2:2, 3:3, 4:4, 5:5}
下面這段比較晦澀的代碼,實現的就是這種空間分配算法。
if not self.places.has_key(index):
n = self.holes.pop(0)
if self.places.has_key(n):
oldpos = self.places[n]
old = self.storage.read(self.piece_size * oldpos, self._piecelen(n))
if self.have[n] and sha(old).digest() != self.hashes[n]:
self.failed('data corrupted on disk - maybe you have two copies running?')
return True
self.storage.write(self.piece_size * n, old)
self.places[n] = n
if index == oldpos or index in self.holes:
self.places[index] = oldpos
else:
for p, v in self.places.items():
if v == index:
break
self.places[index] = index
self.places[p] = oldpos
old = self.storage.read(self.piece_size * index, self.piece_size)
self.storage.write(self.piece_size * oldpos, old)
elif
index in self.holes or index == n:
if not self._waspre(n):
self.storage.write(self.piece_size * n,
self._piecelen(n) * chr(0xFF))
self.places[index] = n
else:
for p, v in self.places.items():
if v == index:
break
self.places[index] = index
self.places[p] = n
old = self.storage.read(self.piece_size * index, self._piecelen(n))
self.storage.write(self.piece_size * n, old)
#
調用
Stoarge::write()
將這個子片斷寫入磁盤,注意是寫到
places[index]
所在的空間上。
self.storage.write(self.places[index] * self.piece_size + begin, piece)
#
既然獲得了一個子片斷,那么發出的
request
個數顯然要減少一個。
self.numactive[index] -= 1
#
如果既沒有尚未發出的
request
,而且也沒有已發出的
request
(每當獲得一個子片斷,
numactive[index]
減少
1
,
numactive[index]
為
0
,說明所有發出的
request
都已經接收到了響應的數據),那么顯然整個片斷已經全部獲得了。
if not
self.inactive_requests[index] and not self.numactive[index]:
檢查整個片斷的有效性,如果通過檢查
if
sha(self.storage.read(self.piece_size * self.places[index],
self._piecelen(index))).digest() == self.hashes[index]:
#
“我”已經擁有了這個片斷
self.have[index] = True
self.inactive_requests[index] = None
#
也檢查過了有效性
self.waschecked[index] = True
self.amount_left -= self._piecelen(index)
if self.amount_left == 0:
self.finished()
如果沒有通過有效性檢查
else:
self.data_flunked(self._piecelen(index))
得丟棄這個片斷
self.inactive_requests[index] = 1
self.amount_inactive += self._piecelen(index)
return False
return True
#
如果向某個
peer
發送的獲取“子片斷”的請求丟失了,那么調用此函數
def
request_lost(self, index, begin, length):
self.inactive_requests[index].append((begin, length))
self.amount_inactive += length
self.numactive[index] -= 1
def
get_piece(self, index, begin, length):
try:
return self._get_piece(index, begin, length)
except IOError, e:
self.failed('IO Error ' + str(e))
return None
def
_get_piece(self, index, begin, length):
if not self.have[index]:
return None
if not self.waschecked[index]:
#
檢查片斷的
hash
值,如果錯誤,返回
None
if
sha(self.storage.read(self.piece_size * self.places[index],
self._piecelen(index))).digest() != self.hashes[index]:
self.failed('told file complete on start-up, but piece failed hash check')
return None
#
通過
hash
檢查
self.waschecked[index] = True
#
檢查一下“子片斷”長度是否越界
if begin + length > self._piecelen(index):
return
None
#
調用
Storage::read()
,將該“子片斷”數據從磁盤上讀出來,返回值就是這段數據。
return self.storage.read(self.piece_size * self.places[index] + begin, length)
posted on 2007-01-19 00:21
苦笑枯 閱讀(325)
評論(0) 編輯 收藏 所屬分類:
P2P