<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 56,  comments - 12,  trackbacks - 0

    StorageWrapper 的作用:把文件片斷進一步切割為子片斷,并且為這些子片斷發送 request 消息。在獲得子片斷后,將數據寫入磁盤。

    請結合 Storage 類的分析來看。

     

    幾點說明:

    1、  為了獲取傳輸性能, BT 把文件片斷切割為多個子片斷。

    2、  BT 為獲取一個子片斷,需要向擁有該子片斷的 peer 發送 request 消息(關于 request 消息,參見《 BT 協議規范》)。

    3、 例如一個 256k 大小的片斷,索引號是 10 ,被劃分為 16 16k 大小的子片斷。那么需要為這 16 個子片斷分別產生一個 request 消息。這些 request 消息在發出之前,以 list 的形式保存在 inactive_requests 這個 list 中。例如對這個片斷,就保存在 inactive_requests 下標為 10 (片斷的索引號)的地方,值是如下的 list [(0,16k),(16k, 16k), (32k, 16k), (48k, 16k), (64k, 16k), (80k, 16k), (96k, 16k), (112k, 16k), (128k, 16k), (144k, 16k), (160k, 16k), (176k, 16k), (192k, 16k), (208k, 16k), (224k, 16k), (240k, 16k)] 。這個處理過程在 _make_inactive() 函數中。因為這些 request 還沒有發送出去,所以叫做 inactive request (未激活的請求)。如果一個 request 發送出去了,那么叫做 active request 。為每個片斷已經發送出去的 request 個數記錄在 numactive 中。如果收到一個子片斷,那么 active request 個數就要減 1 。 amount_inactive 記錄了尚沒有發出 request 的子片斷總的大小。

    4、 每當獲得一個子片段,都要寫入磁盤。如果子片斷所屬的片斷在磁盤上還沒有分配空間,那么首先需要為整個片斷分配空間。如何為片斷分配空間?這正是 StorageWrapper 類中最難理解的一部分代碼。這個“空間分配算法”說起來很簡單,但是在沒有任何注釋的情況下去看代碼,耗費了我好幾天的時間。具體的算法分析,請看 _piece_came_in() 的注釋。

     

     

    class StorageWrapper:

    def __init__(self, storage, request_size, hashes,

                piece_size, finished, failed,

                statusfunc = dummy_status, flag = Event(), check_hashes = True,

    data_flunked = dummy_data_flunked):

     

            self.storage = storage              # Storage 對象

            self.request_size = request_size # 子片斷大小

            self.hashes = hashes        # 文件片斷摘要信息

            self.piece_size = piece_size  # 片斷大小

            self.data_flunked = data_flunked        # 一個函數,用來檢查片斷的完整性

            self.total_length = storage.get_total_length()         # 文件總大小

            self.amount_left = self.total_length        # 未下載完的文件大小

           

                  # 文件總大小的有效性檢查

                  # 因為最后一個片斷長度可能小于 piece_size

            if self.total_length <= piece_size * (len(hashes) - 1):

                raise Error, 'bad data from tracker - total too small'

            if self.total_length > piece_size * len(hashes):

                raise Error, 'bad data from tracker - total too big'

     

            # 兩個事件,分布在下載完成和下載失敗的時候設置

            self.finished = finished

            self.failed = failed

            

                 

    這幾個變量的作用在前面已經介紹過了。

            self.numactive = [0] * len(hashes)

                  inactive_request

    inactive_requests 的值全部被初始化為 1 ,這表示每個片斷都需要發送 request 。后面在對磁盤文件檢查之后,那些已經獲得的片斷,在 inactive_requests 中對應的是 None ,表示不需要再為這些片斷發送 request 了。

            self.inactive_requests = [1] * len(hashes)

            self.amount_inactive = self.total_length

     

    # 是否進入 EndGame 模式?關于 endgame 模式,在《 Incentives Build Robustness in BitTorrent 》的“片斷選擇算法”中有介紹。后面可以看到,在為最后一個“子片斷”產生請求后,進入 endgame 模式。

            self.endgame = False 

           

            self.have = Bitfield(len(hashes))

            # 該片是否檢查了完整性

            self.waschecked = [check_hashes] * len(hashes)

           

    這兩個變量用于“空間分配算法”

            self.places = { }

            self.holes = [ ]

     

            if len(hashes) == 0:

                finished()

                return

     

            targets = {}

            total = len(hashes)

     

    # 檢查每一個片斷,,,

            for i in xrange(len(hashes)):

     

    # 如果磁盤上,還沒有完全為這個片斷分配空間,那么這個片斷需要被下載,在 targets 字典中添加一項(如果已經存在,就不用添加了),它的關鍵字( key )是該片斷的摘要值,它的值( )是一個列表, 這個片斷的索引號被添加到這個列表中。

    這里一度讓我非常迷惑,因為一直以為不同的文件片斷肯定具有不同的摘要值。后來才想明白了,那就是:兩個不同的文件片斷,可能擁有相同的摘要值。不是么?只要這兩個片斷的內容是一樣的。

    這一點,對后面的分析非常重要。

     

    if not self._waspre(i):

                         targets.setdefault(hashes[i], []).append(i)

                    total -= 1

     

            numchecked = 0.0

            if total and check_hashes:

                statusfunc({"activity" : 'checking existing file', "fractionDone" : 0})

     

     

     

    # 這是一個內嵌在函數中的函數。在 c++ 中,可以有內部類,不過好像沒有內部函數的說法。這個函數只能在 __init__() 內部使用。

    這個函數在一個片段被確認獲得后調用

    # piece: 片斷的索引號

    # pos: 這個片斷在磁盤上存儲的位置

    例如,片斷 5 可能存儲在片斷 2 的位置上。請參看后面的“空間分配算法”

     

            def markgot(piece, pos, self = self, check_hashes = check_hashes):

                self.places[piece] = pos

                self.have[piece] = True

                self.amount_left -= self._piecelen(piece)

    self.amount_inactive -= self._piecelen(piece)

    不用再為這個片斷發送 request 消息了

                self.inactive_requests[piece] = None

                self.waschecked[piece] = check_hashes

           

            lastlen = self._piecelen(len(hashes) - 1)        # 最后一個片斷的長度

     

    # 對每一個片斷

            for i in xrange(len(hashes)):

    # 如果磁盤上,還沒有完全為這個片斷分配空間,那么在 holes 中添加該片斷的索引號。

    if not self._waspre(i):

                    self.holes.append(i)

                        

    # 否則,也就是空間已經分配。但是還是不能保證這個片斷已經完全獲得了,正如分析 Storage 時提到的那樣,可能存在“空洞”

                        

    # 如果不需要進行有效性檢查,那么簡單調用 markgot() 表示已經獲得了該片斷。這顯然是一種不負責任的做法。

                elif not check_hashes:

                    markgot(i, i)

     

    # 如果需要進行有效性檢查

    else:

    sha python 內置的模塊,它封裝了 SHA-1 摘要算法。 SHA-1 摘要算法對一段任意長的數據進行計算,得出一個 160bit (也就是 20 個字節)長的消息摘要。在 torrent 文件中,保存了每個片斷的消息摘要。接收方在收到一個文件片斷之后,再計算一次消息摘要,然后跟 torrent 文件中對應的值進行比較,如果結果不一致,那么說明數據在傳輸過程中發生了變化,這樣的數據應該被丟棄。

     

    這里,首先,根據片斷 i 的起始位置開始, lastlen 長的一段數據構造一個 sha 對象。

                    sh = sha(self.storage.read(piece_size * i, lastlen))

    計算這段數據的消息摘要

                    sp = sh.digest()

    然后,更新 sh 這個 sha 對象,注意,是根據片斷 i 剩下的數據來更新的。關于 sha::update() 的功能,請看 python 的幫助。如果有兩段數據 a b ,那么

    sh = sha(a)

    sh.update(b) ,等效于

    sh = sha(a+b)

    所以,下面這個表達式等于

    sh.update(self.storage.read(piece_size*i, self._piecelen(i)))

     

                    sh.update(self.storage.read(piece_size * i + lastlen, self._piecelen(i) - lastlen))

    所以,這次計算出來的就是片斷 i 的摘要

    (原來的困惑:為什么不直接計算 i 的摘要,要這么繞一下了?后來分析清楚“空間分配算法”之后,這后面一段代碼也就沒有什么問題了。)

                    s = sh.digest()

    如果計算出來的摘要和 hashes[i] 一致(后者是從 torrent 文件中獲得的),那么,這個片斷有效且已經存在于磁盤上。

                    if s == hashes[i]:

    markgot(i, i)

                               

                    elif targets.get(s)

    and self._piecelen(i) == self._piecelen(targets[s][-1]):

                        markgot(targets[s].pop(), i)

                    elif not self.have[len(hashes) - 1]

    and sp == hashes[-1]

    and (i == len(hashes) - 1 or not self._waspre(len(hashes) - 1)):

    markgot(len(hashes) - 1, i)

     

                                else:

                        self.places[i] = i

                     if flag.isSet():

                        return

                    numchecked += 1

                    statusfunc({'fractionDone': 1 - float(self.amount_left) / self.total_length})

     

    # 如果所有片斷都下載完了,那么結束。

            if self.amount_left == 0:

                finished()

     

     

    # 檢查某個片斷,是否已經在磁盤上分配了空間,調用的是 Storage:: was_preallocated()

    def _waspre(self, piece):

            return self.storage.was_preallocated(piece * self.piece_size,

     self._piecelen(piece))

     

        # 獲取指定片斷的長度,只有最后一個片斷大小可能小于 piece_size

        def _piecelen(self, piece):

            if piece < len(self.hashes) - 1:

                return self.piece_size

            else:

    return self.total_length - piece * self.piece_size

     

     

           # 返回剩余文件的大小

        def get_amount_left(self):

            return self.amount_left

     

           # 判斷是否已經獲得了一些文件片斷

        def do_I_have_anything(self):

            return self.amount_left < self.total_length

     

           # 將指定片斷切割為“子片斷”

    def _make_inactive(self, index):

     

           # 先獲取該片斷的長度

            length = min(self.piece_size, self.total_length - self.piece_size * index)

            l = []

            x = 0

     

    # 為了獲得更好的傳輸性能, BT 把每個文件片斷又分為更小的“子片斷”,我們可以在 download.py 文件中 default 變量中,找到“子片斷”大小的定義:

    'download_slice_size', 2 ** 14,      "How many bytes to query for per request."

    這里定義的“子片斷”大小是 16k 。

    下面這個循環,就是將一個片斷進一步切割為“子片斷”的過程。

     

            while x + self.request_size < length:

                l.append((x, self.request_size))

    x += self.request_size

            l.append((x, length - x))

     

    # l 保存到 inactive_requests 這個列表中

            self.inactive_requests[index] = l

     

    # 是否處于 endgame 模式,關于 endgame 模式,參加《 Incentives Build Robustness in BitTorrent

        def is_endgame(self):

            return self.endgame

     

        def get_have_list(self):

            return self.have.tostring()

     

        def do_I_have(self, index):

            return self.have[index]

     

    # 判斷指定的片斷,是否還有 request 沒有發出?如果有,那么返回 true ,否則返回 false 。

        def do_I_have_requests(self, index):

            return not not self.inactive_requests[index]

     

    為指定片斷創建一個 request 消息,返回的是一個二元組,例如( 32k, 16k ),表示“子片斷”的起始位置是 32k ,大小是 16k 。

        def new_request(self, index):

            # returns (begin, length)

    # 如果還沒有為該片斷創建 request 。,那么調用 _make_inactive() 創建 request 列表。( inactive_requests[index] 初始化的值是 1

            if self.inactive_requests[index] == 1:

    self._make_inactive(index)

                 

    # numactive[index] 記錄了已經為該片斷發出了多少個 request 。

            self.numactive[index] += 1

            rs = self.inactive_requests[index]

     

           # inactive_request 中移出最小的那個 request (也就是起始位置最小)。

            r = min(rs)

            rs.remove(r)

     

           # amount_inactive 記錄了尚沒有發出 request 的子片斷總的大小。

            self.amount_inactive -= r[1]

     

           # 如果這是最后一個“子片斷”,那么進入 endgame 模式

            if self.amount_inactive == 0:

    self.endgame = T.rue

           # 返回這個 request

            return r

     

        def piece_came_in(self, index, begin, piece):

            try:

                return self._piece_came_in(index, begin, piece)

            except IOError, e:

                self.failed('IO Error ' + str(e))

                return True

     

    如果獲得了某個“子片斷”,那么調用這個函數。

    index :“子片斷”所在片斷的索引號,

    begin :“子片斷”在片斷中的起始位置,

    piece :實際數據

    def _piece_came_in(self, index, begin, piece):

          

    # 如果之前沒有獲得過該片斷中任何“子片斷”,那么首先需要在磁盤上為整個片斷分配空間。

    空間分配的算法如下:

    假設一共是 6 個片斷,現在已經為 0 、 1 、 4 三個片斷分配了空間,那么

    holes [2, 3, 5]

    places {0:0, 1:1, 4:4}

    現在要為片斷 5 分配空間,思路是把片斷 5 的空間暫時先分配在片斷 2 應該在的空間上。這樣分配以后,

    holes [3, 5]

    places: {0:0, 1:1, 4:4, 5:2}

    假設下一步為片斷 2 分配空間,因為 2 的空間已經被 5 占用,所以把 5 的數據轉移到 3 上, 2 才可以使用自己的空間。這樣分配之后,

    holes [5]

    places {0:0, 1:1, 2:2, 4:4, 5:3}

    最后,為 3 分配空間,因為 3 的空間被 5 占用,所以把 5 的數據轉移到 5 自己的空間上, 3 就可以使用自己的空間了。這樣分配之后,

    holes []

    places {0:0, 1:1, 2:2, 3:3, 4:4, 5:5}

     

    下面這段比較晦澀的代碼,實現的就是這種空間分配算法。

     

            if not self.places.has_key(index):

    n = self.holes.pop(0)

     

                if self.places.has_key(n):

                    oldpos = self.places[n]

                    old = self.storage.read(self.piece_size * oldpos, self._piecelen(n))

                    if self.have[n] and sha(old).digest() != self.hashes[n]:

                        self.failed('data corrupted on disk - maybe you have two copies running?')

                        return True

                    self.storage.write(self.piece_size * n, old)

                    self.places[n] = n

                    if index == oldpos or index in self.holes:

                        self.places[index] = oldpos

                    else:

                        for p, v in self.places.items():

                            if v == index:

                                break

                        self.places[index] = index

                        self.places[p] = oldpos

                        old = self.storage.read(self.piece_size * index, self.piece_size)

    self.storage.write(self.piece_size * oldpos, old)

                        

    elif index in self.holes or index == n:

                    if not self._waspre(n):

    self.storage.write(self.piece_size * n,

    self._piecelen(n) * chr(0xFF))

                    self.places[index] = n

                else:

                    for p, v in self.places.items():

                        if v == index:

                            break

                    self.places[index] = index

                    self.places[p] = n

                    old = self.storage.read(self.piece_size * index, self._piecelen(n))

                    self.storage.write(self.piece_size * n, old)

           

           # 調用 Stoarge::write() 將這個子片斷寫入磁盤,注意是寫到 places[index] 所在的空間上。

    self.storage.write(self.places[index] * self.piece_size + begin, piece)

     

    # 既然獲得了一個子片斷,那么發出的 request 個數顯然要減少一個。

            self.numactive[index] -= 1

           

    # 如果既沒有尚未發出的 request ,而且也沒有已發出的 request (每當獲得一個子片斷, numactive[index] 減少 1 numactive[index] 0 ,說明所有發出的 request 都已經接收到了響應的數據),那么顯然整個片斷已經全部獲得了。

    if not self.inactive_requests[index] and not self.numactive[index]:

           檢查整個片斷的有效性,如果通過檢查

    if sha(self.storage.read(self.piece_size * self.places[index],

     self._piecelen(index))).digest() == self.hashes[index]:

                        

    # “我”已經擁有了這個片斷

                    self.have[index] = True

                    self.inactive_requests[index] = None

                         # 也檢查過了有效性

                    self.waschecked[index] = True

                               

                    self.amount_left -= self._piecelen(index)

                    if self.amount_left == 0:

    self.finished()

                  如果沒有通過有效性檢查

                else:

                    self.data_flunked(self._piecelen(index))

                         得丟棄這個片斷

                    self.inactive_requests[index] = 1

                    self.amount_inactive += self._piecelen(index)

                    return False

            return True

     

    # 如果向某個 peer 發送的獲取“子片斷”的請求丟失了,那么調用此函數

        def request_lost(self, index, begin, length):

            self.inactive_requests[index].append((begin, length))

            self.amount_inactive += length

            self.numactive[index] -= 1

     

        def get_piece(self, index, begin, length):

            try:

                return self._get_piece(index, begin, length)

            except IOError, e:

                self.failed('IO Error ' + str(e))

                return None

     

        def _get_piece(self, index, begin, length):

            if not self.have[index]:

                return None

     

            if not self.waschecked[index]:

                  # 檢查片斷的 hash 值,如果錯誤,返回 None

    if sha(self.storage.read(self.piece_size * self.places[index],

    self._piecelen(index))).digest() != self.hashes[index]:

                    self.failed('told file complete on start-up, but piece failed hash check')

                    return None

                  # 通過 hash 檢查

    self.waschecked[index] = True

     

           # 檢查一下“子片斷”長度是否越界

            if begin + length > self._piecelen(index):

    return None

                 

    # 調用 Storage::read() ,將該“子片斷”數據從磁盤上讀出來,返回值就是這段數據。

            return self.storage.read(self.piece_size * self.places[index] + begin, length)
    posted on 2007-01-19 00:21 苦笑枯 閱讀(325) 評論(0)  編輯  收藏 所屬分類: P2P
    收藏來自互聯網,僅供學習。若有侵權,請與我聯系!

    <2007年1月>
    31123456
    78910111213
    14151617181920
    21222324252627
    28293031123
    45678910

    常用鏈接

    留言簿(2)

    隨筆分類(56)

    隨筆檔案(56)

    搜索

    •  

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 久久久久亚洲国产AV麻豆| 免费电视剧在线观看| 鲁啊鲁在线视频免费播放| 91亚洲va在线天线va天堂va国产| 又粗又大又猛又爽免费视频| 亚洲精品动漫免费二区| 国产激情免费视频在线观看 | 中文字幕免费在线看线人| 国产三级在线免费观看| 日韩亚洲人成在线综合| 色在线亚洲视频www| 久久国产亚洲精品无码| 日韩亚洲一区二区三区| 国产精品亚洲视频| 亚洲区不卡顿区在线观看| 国产精品另类激情久久久免费| 亚洲精品免费在线视频| 最近免费中文字幕大全免费| 日本免费在线中文字幕| 13小箩利洗澡无码视频网站免费| 特a级免费高清黄色片| 小说专区亚洲春色校园| 亚洲第一街区偷拍街拍| 亚洲中文无码亚洲人成影院| 亚洲精品一二三区| 色老板亚洲视频免在线观| 亚洲第一香蕉视频| 亚洲国产美女在线观看| 亚洲国产韩国一区二区| 7777久久亚洲中文字幕蜜桃| 久久亚洲精品无码VA大香大香| 久久久久亚洲AV片无码下载蜜桃| 久久久久久亚洲av成人无码国产| 亚洲精品无码永久在线观看你懂的| 久久国产成人亚洲精品影院| 最新国产AV无码专区亚洲| 亚洲一级特黄大片在线观看| 亚洲人成网站在线观看青青| 国产精品亚洲二区在线观看| 亚洲AV午夜福利精品一区二区| 久久精品国产亚洲麻豆|