<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    小明思考

    Just a software engineer
    posts - 124, comments - 36, trackbacks - 0, articles - 0
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

    leveldb研究6- Level和Compaction

    Posted on 2012-03-15 17:28 小明 閱讀(7893) 評論(0)  編輯  收藏 所屬分類: 分布式計算
    leveldb之所以使用level作為數據庫名稱,精華就在于level的設計。



    本質是一種歸并排序算法。
    這樣設計的好處主要是可以減少compaction的次數和每次的文件個數。


    Compaction

    • 為什么要compaction?
     compaction可以提高數據的查詢效率,沒有經過compaction,需要從很多SST file去查找,而做過compaction后,只需要從有限的SST文件去查找,大大的提高了隨機查詢的效率,另外也可以刪除過期數據。

    • 什么時候可能進行compaction?
     1. database open的時候
     2. write的時候
     3. read的時候?

    <db/dbimpl.cc>

    //是否要進行compaction
    void DBImpl::MaybeScheduleCompaction() {
      mutex_.AssertHeld();
      
    if (bg_compaction_scheduled_) { //已經在進行
      } else if (shutting_down_.Acquire_Load()) {
      } 
    else if (imm_ == NULL &&
                 manual_compaction_ 
    == NULL &&
                 
    !versions_->NeedsCompaction()) {
        
    //imm_為NULL:沒有memtable需要flush
        
    //manual_compaction_:手動compaction
      } else {
        bg_compaction_scheduled_ 
    = true;
        env_
    ->Schedule(&DBImpl::BGWork, this);
      }
    }

    <db/version_set.h>
    bool NeedsCompaction() const {
        Version
    * v = current_;
        
    return (v->compaction_score_ >= 1|| (v->file_to_compact_ != NULL);
      }

    如何計算這個compaction_score呢?看下面的代碼:
    <db/version_set.cc>
    void VersionSet::Finalize(Version* v) {
      
    int best_level = -1;
      
    double best_score = -1;

    //遍歷所有的level  
    for (int level = 0; level < config::kNumLevels-1; level++) {
        
    double score;
        
    if (level == 0) {
          
    //對于level 0,計算當前文件個數和預定義的compaction trigger value(Default:4)之比
          score = v->files_[level].size() /
              static_cast
    <double>(config::kL0_CompactionTrigger);
        } 
    else {
          
    //對于其他level,計算level文件大小和level應有的大小(10^N MB)
          const uint64_t level_bytes = TotalFileSize(v->files_[level]);
          score 
    = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
        }
         
    //找出最需要compaction的level
        if (score > best_score) {
          best_level 
    = level;
          best_score 
    = score;
        }
      }

      v
    ->compaction_level_ = best_level;
      v
    ->compaction_score_ = best_score;
    }

    • 如何做compaction?
    leveldb 運行會啟動一個background thread,會執行一些background task,compaction就在這個線程中執行。


    首先來看看compaction對象如何定義的
    <db/version_set.h>
    //關于compaction的一些信息
    class Compaction {
     
    public:
      
    ~Compaction();

      
    //compaction Level:會將N層N+1層合并生成N+1文件
      int level() const { return level_; }

      
    //返回VersionEdit,用于記錄到manifest
      VersionEdit* edit() { return &edit_; }

      
    //返回N層或者N+1層的文件個數,which = 0,1
      int num_input_files(int which) const { return inputs_[which].size(); }

      
    //返回具體的文件信息,which:level
      FileMetaData* input(int which, int i) const { return inputs_[which][i]; }

      
    //本次compaction最大輸出字節
      uint64_t MaxOutputFileSize() const { return max_output_file_size_; }

      
    //是否只需要移動文件進行compaction,不需要merge和split
      bool IsTrivialMove() const;

      
    //把input都當成delete寫入edit
      void AddInputDeletions(VersionEdit* edit);

      
    // Returns true if the information we have available guarantees that
      
    // the compaction is producing data in "level+1" for which no data exists
      
    // in levels greater than "level+1".
      bool IsBaseLevelForKey(const Slice& user_key);

      
    // Returns true iff we should stop building the current output
      
    // before processing "internal_key".
      bool ShouldStopBefore(const Slice& internal_key);

      
    // Release the input version for the compaction, once the compaction
      
    // is successful.
      void ReleaseInputs();

     
    private:
      friend 
    class Version;
      friend 
    class VersionSet;

      
    explicit Compaction(int level);

      
    int level_;
      uint64_t max_output_file_size_;
      Version
    * input_version_;
      VersionEdit edit_;

      
    // Each compaction reads inputs from "level_" and "level_+1"
      std::vector<FileMetaData*> inputs_[2];      // The two sets of inputs

      
    // State used to check for number of of overlapping grandparent files
      
    // (parent == level_ + 1, grandparent == level_ + 2)
      std::vector<FileMetaData*> grandparents_;
      size_t grandparent_index_;  
    // Index in grandparent_starts_
      bool seen_key_;             // Some output key has been seen
      int64_t overlapped_bytes_;  // Bytes of overlap between current output
                                  
    // and grandparent files

      
    // State for implementing IsBaseLevelForKey

      
    // level_ptrs_ holds indices into input_version_->levels_: our state
      
    // is that we are positioned at one of the file ranges for each
      
    // higher level than the ones involved in this compaction (i.e. for
      
    // all L >= level_ + 2).
      size_t level_ptrs_[config::kNumLevels];
    };

    Compaction Thread
    <db/dbimpl.cc>
    void DBImpl::BackgroundCompaction() {
      mutex_.AssertHeld();

      
    //把memtable flush到sstable
      if (imm_ != NULL) {
        CompactMemTable();
        
    return;
      }

      Compaction
    * c;
      
    bool is_manual = (manual_compaction_ != NULL);
      InternalKey manual_end;
      
    if (is_manual) { //手動compaction
        ManualCompaction* m = manual_compaction_;
        
    //根據range來做compaction
        c = versions_->CompactRange(m->level, m->begin, m->end);
        m
    ->done = (c == NULL);
        
    if (c != NULL) {
          manual_end 
    = c->input(0, c->num_input_files(0- 1)->largest;
        }
        Log(options_.info_log,
            
    "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
            m
    ->level,
            (m
    ->begin ? m->begin->DebugString().c_str() : "(begin)"),
            (m
    ->end ? m->end->DebugString().c_str() : "(end)"),
            (m
    ->done ? "(end)" : manual_end.DebugString().c_str()));
      } 
    else {
        
    //找到需要compaction的level&file
        c = versions_->PickCompaction();
      }

      Status status;
      
    if (c == NULL) {
        
    // Nothing to do
      } else if (!is_manual && c->IsTrivialMove()) { //只需要移動sst file
        
    // Move file to next level
        assert(c->num_input_files(0== 1);
        FileMetaData
    * f = c->input(00);
        c
    ->edit()->DeleteFile(c->level(), f->number);
        c
    ->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                           f
    ->smallest, f->largest);
        status 
    = versions_->LogAndApply(c->edit(), &mutex_);
        VersionSet::LevelSummaryStorage tmp;
        Log(options_.info_log, 
    "Moved #%lld to level-%d %lld bytes %s: %s\n",
            static_cast
    <unsigned long long>(f->number),
            c
    ->level() + 1,
            static_cast
    <unsigned long long>(f->file_size),
            status.ToString().c_str(),
            versions_
    ->LevelSummary(&tmp));
      } 
    else {//完成compaction
        CompactionState* compact = new CompactionState(c);
        status 
    = DoCompactionWork(compact);
        CleanupCompaction(compact);
        c
    ->ReleaseInputs();
        DeleteObsoleteFiles();
      }
      delete c;

      
    if (status.ok()) {
        
    // Done
      } else if (shutting_down_.Acquire_Load()) {
        
    // Ignore compaction errors found during shutting down
      } else {
        Log(options_.info_log,
            
    "Compaction error: %s", status.ToString().c_str());
        
    if (options_.paranoid_checks && bg_error_.ok()) {
          bg_error_ 
    = status;
        }
      }

      
    if (is_manual) {
        ManualCompaction
    * m = manual_compaction_;
        
    if (!status.ok()) {
          m
    ->done = true;
        }
        
    if (!m->done) {
          
    // We only compacted part of the requested range.  Update *m
          
    // to the range that is left to be compacted.
          m->tmp_storage = manual_end;
          m
    ->begin = &m->tmp_storage;
        }
        manual_compaction_ 
    = NULL;
      }
    }

    compaction memtable:寫一個level0文件,并寫入manifest log

    Status DBImpl::CompactMemTable() {
      mutex_.AssertHeld();
      assert(imm_ 
    != NULL);

      VersionEdit edit;
      Version
    * base = versions_->current();
      
    base->Ref();
      
    //寫入level0 sst table
      Status s = WriteLevel0Table(imm_, &edit, base);
      
    base->Unref();

      
    if (s.ok() && shutting_down_.Acquire_Load()) {
        s 
    = Status::IOError("Deleting DB during memtable compaction");
      }

      
    // Replace immutable memtable with the generated Table
      if (s.ok()) {
        edit.SetPrevLogNumber(
    0);
        edit.SetLogNumber(logfile_number_);  
    // Earlier logs no longer needed
        
    //生成edit并計入manifest log
        s = versions_->LogAndApply(&edit, &mutex_);
      }

      
    if (s.ok()) {
        
    // Commit to the new state
        imm_->Unref();
        imm_ 
    = NULL;
        has_imm_.Release_Store(NULL);
        DeleteObsoleteFiles();
      }

      
    return s;
    }

    下面來看看compaction已有文件:

    找出要compaction的文件:

    <db/version_set.cc>
    Compaction* VersionSet::PickCompaction() {
      Compaction
    * c;
      
    int level;

    //是否需要compaction,有兩種compaction,一種基于size大小,另外一種基于被seek的次數 
    const bool size_compaction = (current_->compaction_score_ >= 1);
      
    const bool seek_compaction = (current_->file_to_compact_ != NULL);
      
    if (size_compaction) {
        level 
    = current_->compaction_level_;
        assert(level 
    >= 0);
        assert(level
    +1 < config::kNumLevels);
        c 
    = new Compaction(level);

        
    //每一層有一個compact_pointer,用于記錄compaction key,這樣可以進行循環compaction
        for (size_t i = 0; i < current_->files_[level].size(); i++) {
          FileMetaData
    * f = current_->files_[level][i];
          
    if (compact_pointer_[level].empty() ||
              icmp_.Compare(f
    ->largest.Encode(), compact_pointer_[level]) > 0) {
            
    //找到一個文件就可以了
            c->inputs_[0].push_back(f);
            
    break;
          }
        }
        
    if (c->inputs_[0].empty()) {
          
    // Wrap-around to the beginning of the key space
          c->inputs_[0].push_back(current_->files_[level][0]);
        }
      } 
    else if (seek_compaction) {
        level 
    = current_->file_to_compact_level_;
        c 
    = new Compaction(level);
        c
    ->inputs_[0].push_back(current_->file_to_compact_);
      } 
    else {
        
    return NULL;
      }

      c
    ->input_version_ = current_;
      c
    ->input_version_->Ref();

      
    // level 0:特殊處理,因為可能有key 重疊,把所有重疊都找出來,一起做compaction
      if (level == 0) {
        InternalKey smallest, largest;
        GetRange(c
    ->inputs_[0], &smallest, &largest);
        
    // Note that the next call will discard the file we placed in
        
    // c->inputs_[0] earlier and replace it with an overlapping set
        
    // which will include the picked file.
        current_->GetOverlappingInputs(0&smallest, &largest, &c->inputs_[0]);
        assert(
    !c->inputs_[0].empty());
      }

      
    //找到level N+1需要compaction的文件
      SetupOtherInputs(c);

      
    return c;
    }

    <db/version_set.cc>
    void VersionSet::SetupOtherInputs(Compaction* c) {
      
    const int level = c->level();
      InternalKey smallest, largest;
      GetRange(c
    ->inputs_[0], &smallest, &largest);

      
    //找到所有在Level N+1層有重疊的文件
       current_->GetOverlappingInputs(level+1&smallest, &largest, &c->inputs_[1]);

      
    //取出key的范圍
      InternalKey all_start, all_limit;
      GetRange2(c
    ->inputs_[0], c->inputs_[1], &all_start, &all_limit);

      
    //檢查是否能從Level N找到更多的文件
      if (!c->inputs_[1].empty()) {
        std::vector
    <FileMetaData*> expanded0;
        current_
    ->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
        
    const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
        
    const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
        
    const int64_t expanded0_size = TotalFileSize(expanded0);
        
    if (expanded0.size() > c->inputs_[0].size() &&
            inputs1_size 
    + expanded0_size < kExpandedCompactionByteSizeLimit) {
          InternalKey new_start, new_limit;
          GetRange(expanded0, 
    &new_start, &new_limit);
          std::vector
    <FileMetaData*> expanded1;
          current_
    ->GetOverlappingInputs(level+1&new_start, &new_limit,
                                         
    &expanded1);
          
    if (expanded1.size() == c->inputs_[1].size()) {
            Log(options_
    ->info_log,
                
    "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
                level,
                
    int(c->inputs_[0].size()),
                
    int(c->inputs_[1].size()),
                
    long(inputs0_size), long(inputs1_size),
                
    int(expanded0.size()),
                
    int(expanded1.size()),
                
    long(expanded0_size), long(inputs1_size));
            smallest 
    = new_start;
            largest 
    = new_limit;
            c
    ->inputs_[0= expanded0;
            c
    ->inputs_[1= expanded1;
            GetRange2(c
    ->inputs_[0], c->inputs_[1], &all_start, &all_limit);
          }
        }
      }

      
    // Compute the set of grandparent files that overlap this compaction
      
    // (parent == level+1; grandparent == level+2)
      if (level + 2 < config::kNumLevels) {
        current_
    ->GetOverlappingInputs(level + 2&all_start, &all_limit,
                                       
    &c->grandparents_);
      }

      
    if (false) {
        Log(options_
    ->info_log, "Compacting %d '%s' .. '%s'",
            level,
            smallest.DebugString().c_str(),
            largest.DebugString().c_str());
      }

      
    //設置新的compact_pointer
      compact_pointer_[level] = largest.Encode().ToString();
      c
    ->edit_.SetCompactPointer(level, largest);
    }

    do compaction task:
    Status DBImpl::DoCompactionWork(CompactionState* compact) {
      
    const uint64_t start_micros = env_->NowMicros();
      int64_t imm_micros 
    = 0;  // Micros spent doing imm_ compactions

      Log(options_.info_log,  
    "Compacting %d@%d + %d@%d files",
          compact
    ->compaction->num_input_files(0),
          compact
    ->compaction->level(),
          compact
    ->compaction->num_input_files(1),
          compact
    ->compaction->level() + 1);

      assert(versions_
    ->NumLevelFiles(compact->compaction->level()) > 0);
      assert(compact
    ->builder == NULL);
      assert(compact
    ->outfile == NULL);
      
    if (snapshots_.empty()) {
        compact
    ->smallest_snapshot = versions_->LastSequence();
      } 
    else {
        compact
    ->smallest_snapshot = snapshots_.oldest()->number_;
      }

      
    // Release mutex while we're actually doing the compaction work
      mutex_.Unlock();

      
    //生成iterator:遍歷要compaction的數據
      Iterator* input = versions_->MakeInputIterator(compact->compaction);
      input
    ->SeekToFirst();
      Status status;
      ParsedInternalKey ikey;
      std::
    string current_user_key;
      
    bool has_current_user_key = false;
      SequenceNumber last_sequence_for_key 
    = kMaxSequenceNumber;
      
    for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
        
    // 如果有memtable要compaction:優先去做
        if (has_imm_.NoBarrier_Load() != NULL) {
          
    const uint64_t imm_start = env_->NowMicros();
          mutex_.Lock();
          
    if (imm_ != NULL) {
            CompactMemTable();
            bg_cv_.SignalAll();  
    // Wakeup MakeRoomForWrite() if necessary
          }
          mutex_.Unlock();
          imm_micros 
    += (env_->NowMicros() - imm_start);
        }

        Slice key 
    = input->key();
        
    //檢查是不是中途輸出compaction的結果,避免compaction結果和level N+2 files有過多的重疊
        if (compact->compaction->ShouldStopBefore(key) &&
            compact
    ->builder != NULL) {
          status 
    = FinishCompactionOutputFile(compact, input);
          
    if (!status.ok()) {
            
    break;
          }
        }

        
    // Handle key/value, add to state, etc.
        bool drop = false;
        
    if (!ParseInternalKey(key, &ikey)) {
          
    // Do not hide error keys
          current_user_key.clear();
          has_current_user_key 
    = false;
          last_sequence_for_key 
    = kMaxSequenceNumber;
        } 
    else {
          
    if (!has_current_user_key ||
              user_comparator()
    ->Compare(ikey.user_key,
                                         Slice(current_user_key)) 
    != 0) {
            
    // First occurrence of this user key
            current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
            has_current_user_key 
    = true;
            last_sequence_for_key 
    = kMaxSequenceNumber;
          }

          
    if (last_sequence_for_key <= compact->smallest_snapshot) {
            
    // Hidden by an newer entry for same user key
            drop = true;    // (A)
          } else if (ikey.type == kTypeDeletion &&
                     ikey.sequence 
    <= compact->smallest_snapshot &&
                     compact
    ->compaction->IsBaseLevelForKey(ikey.user_key)) {
            
    // For this user key:
            
    // (1) there is no data in higher levels
            
    // (2) data in lower levels will have larger sequence numbers
            
    // (3) data in layers that are being compacted here and have
            
    //     smaller sequence numbers will be dropped in the next
            
    //     few iterations of this loop (by rule (A) above).
            
    // Therefore this deletion marker is obsolete and can be dropped.
            drop = true;
          }

          last_sequence_for_key 
    = ikey.sequence;
        }

        
    if (!drop) {
          
    // Open output file if necessary
          if (compact->builder == NULL) {
            status 
    = OpenCompactionOutputFile(compact);
            
    if (!status.ok()) {
              
    break;
            }
          }
          
    if (compact->builder->NumEntries() == 0) {
            compact
    ->current_output()->smallest.DecodeFrom(key);
          }
          compact
    ->current_output()->largest.DecodeFrom(key);
          compact
    ->builder->Add(key, input->value());

          
    // 達到sst文件大小,重新寫文件
          if (compact->builder->FileSize() >=
              compact
    ->compaction->MaxOutputFileSize()) {
            status 
    = FinishCompactionOutputFile(compact, input);
            
    if (!status.ok()) {
              
    break;
            }
          }
        }

        input
    ->Next();
      }

      
    if (status.ok() && shutting_down_.Acquire_Load()) {
        status 
    = Status::IOError("Deleting DB during compaction");
      }
      
    if (status.ok() && compact->builder != NULL) {
        status 
    = FinishCompactionOutputFile(compact, input);
      }
      
    if (status.ok()) {
        status 
    = input->status();
      }
      delete input;
      input 
    = NULL;

     
    //更新compaction的一些統計數據
      CompactionStats stats;
      stats.micros 
    = env_->NowMicros() - start_micros - imm_micros;
      
    for (int which = 0; which < 2; which++) {
        
    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
          stats.bytes_read 
    += compact->compaction->input(which, i)->file_size;
        }
      }
      
    for (size_t i = 0; i < compact->outputs.size(); i++) {
        stats.bytes_written 
    += compact->outputs[i].file_size;
      }

      mutex_.Lock();
      stats_[compact
    ->compaction->level() + 1].Add(stats);

      
    if (status.ok()) {
        status 
    = InstallCompactionResults(compact);
      }
      VersionSet::LevelSummaryStorage tmp;
      Log(options_.info_log,
          
    "compacted to: %s", versions_->LevelSummary(&tmp));
      
    return status;
    }





    主站蜘蛛池模板: 国产中文在线亚洲精品官网| 国产情侣激情在线视频免费看| 美女的胸又黄又www网站免费| 亚洲国产熟亚洲女视频| 亚洲AV综合色区无码二区爱AV| 亚洲成电影在线观看青青| 亚洲国产精品一区二区久| 亚洲另类春色国产精品| 亚洲一区二区三区国产精华液| 亚洲日本天堂在线| 亚洲s码欧洲m码吹潮| 男女超爽视频免费播放| 免费精品国产自产拍在线观看| 一级做a爱片特黄在线观看免费看| 国产伦精品一区二区免费| 嫩草在线视频www免费观看| 日韩免费在线观看视频| 中文字幕视频免费| 性做久久久久久免费观看| 欧洲美熟女乱又伦免费视频 | 国产精品亚洲а∨无码播放不卡 | 亚洲人成电影福利在线播放| 久久久久久亚洲Av无码精品专口 | 亚洲色精品aⅴ一区区三区| 亚洲AV无码欧洲AV无码网站| 亚洲熟妇av一区| 亚洲日韩一区精品射精| 免费一级毛片在线播放放视频| 91免费在线视频| 亚洲一级毛片免费在线观看| 日本特黄特色aa大片免费| 亚洲情侣偷拍精品| 亚洲国产天堂久久综合网站| 亚洲高清有码中文字| 免费国产在线精品一区| 青青青国产手机频在线免费观看| 91精品免费久久久久久久久| 日本xxwwxxww在线视频免费 | 日韩免费无砖专区2020狼| 久久精品国产亚洲一区二区三区| 亚洲AV日韩精品久久久久久|