<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    小明思考

    Just a software engineer
    posts - 124, comments - 36, trackbacks - 0, articles - 0
      BlogJava :: 首頁 :: 新隨筆 :: 聯系 :: 聚合  :: 管理

    leveldb研究9- 流程分析:打開數據庫

    Posted on 2012-03-20 16:02 小明 閱讀(3439) 評論(0)  編輯  收藏 所屬分類: 分布式計算
    leveldb 是通過Open函數來打開/新建數據庫。
    static Status Open(const Options& options,
                         
    const std::string& name,
                         DB
    ** dbptr);

    其中options指定一些選項。
    struct Options {
      
    // -------------------
      
    // 影響行為的參數

      
    //comparator用于指定key的排列方式,默認按照字節排序
      const Comparator* comparator;

      
    //如果不存在則創建
      
    // Default: false
      bool create_if_missing;

      
    // 如果存在則失敗
      
    // Default: false
      bool error_if_exists;

      
    // 是否做嚴格的檢查
      
    // Default: false
      bool paranoid_checks;

      
    // env: os 封裝
      
    // Default: Env::Default()
      Env* env;

      
    // log file,默認和database相同路徑
      
    // Default: NULL
      Logger* info_log;

      
    // -------------------
      
    // 影響性能的參數

      
    // 寫緩沖大小,增加會提高寫的性能,但是會增加啟動的時間,因為有更多的數據需要恢復
      
    //
      
    // Default: 4MB
      size_t write_buffer_size;

      
    // 最大打開的文件個數,用于TableCache
      
    //
      
    // Default: 1000
      int max_open_files;

      
    // Control over blocks (user data is stored in a set of blocks, and
      
    // a block is the unit of reading from disk).

      
    // 指定Block cache,默認leveldb會自動創建8MB的internal cache
      
    // Default: NULL
      Cache* block_cache;

      
    //SST file中的Block size,為壓縮之前的數據
      
    //
      
    // Default: 4K
      size_t block_size;

      
    // SST file 中的restart pointer的間隔,參見SST的文件格式
      
    //
      
    // Default: 16
      int block_restart_interval;

      
    // 壓縮類型,默認為google的snappy壓縮
      CompressionType compression;

      
    // Create an Options object with default values for all fields.
      Options();
    };

    具體看看Open的實現:
    <db/dbimpl.cc>
    Status DB::Open(const Options& options, const std::string& dbname,
                    DB
    ** dbptr) {
      
    *dbptr = NULL;

      
    //實例化對象:DBImpl
      DBImpl* impl = new DBImpl(options, dbname);
      
    //加鎖
      impl->mutex_.Lock();
      VersionEdit edit;
      
    //從log中恢復數據,生成新的SST file
      Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
      if (s.ok()) { 
        
    //創建新的log file
        uint64_t new_log_number = impl->versions_->NewFileNumber();
        WritableFile
    * lfile;
        s 
    = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
                                         
    &lfile);
        
    if (s.ok()) {
          edit.SetLogNumber(new_log_number);
          impl
    ->logfile_ = lfile;
          impl
    ->logfile_number_ = new_log_number;
          impl
    ->log_ = new log::Writer(lfile);
          
    //生成新的manifest文件
          s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
        }
        
    if (s.ok()) {
          
    //刪除失效文件
          impl->DeleteObsoleteFiles();
          
    //進行compaction
          impl->MaybeScheduleCompaction();
        }
      }
      impl
    ->mutex_.Unlock();
      
    if (s.ok()) {
        
    *dbptr = impl;
      } 
    else {
        delete impl;
      }
      
    return s;
    }

    因為上次關閉數據庫的時候,內存的數據可能并沒有寫入SST文件,所以要從*.log中讀取記錄,并寫入新的SST文件。
    <db/dbimpl.cc>
    Status DBImpl::Recover(VersionEdit* edit) {
      mutex_.AssertHeld();

      
    //創建folder
      env_->CreateDir(dbname_);
      assert(db_lock_ 
    == NULL);
      
    //生成LOCK文件并鎖定
      Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
      
    if (!s.ok()) {
        
    return s;
      }

      
    if (!env_->FileExists(CurrentFileName(dbname_))) {
        
    if (options_.create_if_missing) {
          
    //新建database
          s = NewDB();
          
    if (!s.ok()) {
            
    return s;
          }
        } 
    else {
          
    return Status::InvalidArgument(
              dbname_, 
    "does not exist (create_if_missing is false)");
        }
      } 
    else {
        
    if (options_.error_if_exists) {
          
    return Status::InvalidArgument(
              dbname_, 
    "exists (error_if_exists is true)");
        }
      }

      
    //重建manifest信息
      s = versions_->Recover();
      
    if (s.ok()) {
        SequenceNumber max_sequence(
    0);

        
    //得到上次的log file
        const uint64_t min_log = versions_->LogNumber();
        
    const uint64_t prev_log = versions_->PrevLogNumber();
        std::vector
    <std::string> filenames;
        s 
    = env_->GetChildren(dbname_, &filenames);
        
    if (!s.ok()) {
          
    return s;
        }
        uint64_t number;
        FileType type;
        std::vector
    <uint64_t> logs;
        
    for (size_t i = 0; i < filenames.size(); i++) {
          
    if (ParseFileName(filenames[i], &number, &type)
              
    && type == kLogFile
              
    && ((number >= min_log) || (number == prev_log))) {
            logs.push_back(number);
          }
        }

        
    // Recover in the order in which the logs were generated
        std::sort(logs.begin(), logs.end());
        
    for (size_t i = 0; i < logs.size(); i++) {
          
    //從*.log中恢復數據
          s = RecoverLogFile(logs[i], edit, &max_sequence);

          
    // The previous incarnation may not have written any MANIFEST
          
    // records after allocating this log number.  So we manually
          
    // update the file number allocation counter in VersionSet.
          versions_->MarkFileNumberUsed(logs[i]);
        }

        
    if (s.ok()) {
          
    if (versions_->LastSequence() < max_sequence) {
            versions_
    ->SetLastSequence(max_sequence);
          }
        }
      }

      
    return s;
    }

    繼續看RecoverLogFile的實現:
    <db/dbimpl.cc>
    Status DBImpl::RecoverLogFile(uint64_t log_number,
                                  VersionEdit
    * edit,
                                  SequenceNumber
    * max_sequence) {
      
    //LogReporter:出現壞數據的時候報告
       struct LogReporter : public log::Reader::Reporter {
        Env
    * env;
        Logger
    * info_log;
        
    const char* fname;
        Status
    * status;  // NULL if options_.paranoid_checks==false
        virtual void Corruption(size_t bytes, const Status& s) {
          Log(info_log, 
    "%s%s: dropping %d bytes; %s",
              (
    this->status == NULL ? "(ignoring error) " : ""),
              fname, static_cast
    <int>(bytes), s.ToString().c_str());
          
    if (this->status != NULL && this->status->ok()) *this->status = s;
        }
      };

      mutex_.AssertHeld();

      
    //打開Log file用于順序讀取
      std::string fname = LogFileName(dbname_, log_number);
      SequentialFile
    * file;
      Status status 
    = env_->NewSequentialFile(fname, &file);
      
    if (!status.ok()) {
        MaybeIgnoreError(
    &status);
        
    return status;
      }

      LogReporter reporter;
      reporter.env 
    = env_;
      reporter.info_log 
    = options_.info_log;
      reporter.fname 
    = fname.c_str();
      reporter.status 
    = (options_.paranoid_checks ? &status : NULL);
      
    // log::Reader讀取數據
      log::Reader reader(file, &reporter, true/*checksum*/,
                         
    0/*initial_offset*/);
      Log(options_.info_log, 
    "Recovering log #%llu",
          (unsigned 
    long long) log_number);

      std::
    string scratch;
      Slice record;
      WriteBatch batch;
      MemTable
    * mem = NULL;
      
    //遍歷log file,讀取記錄
      while (reader.ReadRecord(&record, &scratch) &&
             status.ok()) {
        
    if (record.size() < 12) {
          reporter.Corruption(
              record.size(), Status::Corruption(
    "log record too small"));
          
    continue;
        }
        WriteBatchInternal::SetContents(
    &batch, record);

        
    if (mem == NULL) {
          
    //新建MemTable用于保存數據
          mem = new MemTable(internal_comparator_);
          mem
    ->Ref();
        }
        
    //插入memtable
        status = WriteBatchInternal::InsertInto(&batch, mem);
        MaybeIgnoreError(
    &status);
        
    if (!status.ok()) {
          
    break;
        }
        
    const SequenceNumber last_seq =
            WriteBatchInternal::Sequence(
    &batch) +
            WriteBatchInternal::Count(
    &batch) - 1;
        
    if (last_seq > *max_sequence) {
          
    *max_sequence = last_seq;
        }

        
    if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
          
    //寫入SST file:level 0
          status = WriteLevel0Table(mem, edit, NULL);
          
    if (!status.ok()) {
            
    break;
          }
          
    //釋放并刪除memtable
          mem->Unref();
          mem 
    = NULL;
        }
      }

      
    if (status.ok() && mem != NULL) {
        status 
    = WriteLevel0Table(mem, edit, NULL);
        
    // Reflect errors immediately so that conditions like full
        
    // file-systems cause the DB::Open() to fail.
      }

      
    if (mem != NULL) mem->Unref();
      delete file;
      
    return status;
    }

    至此完成SST file的寫入。

    接下來看看manifest文件的重建
    mainfest的重建有兩步,第一步是調用VersionSet::Recover函數恢復到上次的manifest,然后使用VersionSet::LogAndApply把新增的SST文件記錄也寫入manifest文件中。
    <db/version_set.cc>

    Status VersionSet::Recover() {
      
    struct LogReporter : public log::Reader::Reporter {
        Status
    * status;
        
    virtual void Corruption(size_t bytes, const Status& s) {
          
    if (this->status->ok()) *this->status = s;
        }
      };

      
    // 讀取CURRENT文件,獲取最新的MANIFEST文件
      std::string current;
      Status s 
    = ReadFileToString(env_, CurrentFileName(dbname_), &current);
      
    if (!s.ok()) {
        
    return s;
      }
      
    if (current.empty() || current[current.size()-1!= '\n') {
        
    return Status::Corruption("CURRENT file does not end with newline");
      }
      current.resize(current.size() 
    - 1);

      std::
    string dscname = dbname_ + "/" + current;
      SequentialFile
    * file;
      
    //打開當前MANIFEST文件
      s = env_->NewSequentialFile(dscname, &file);
      
    if (!s.ok()) {
        
    return s;
      }

      
    bool have_log_number = false;
      
    bool have_prev_log_number = false;
      
    bool have_next_file = false;
      
    bool have_last_sequence = false;
      uint64_t next_file 
    = 0;
      uint64_t last_sequence 
    = 0;
      uint64_t log_number 
    = 0;
      uint64_t prev_log_number 
    = 0;
      Builder builder(
    this, current_);

      {
        LogReporter reporter;
        reporter.status 
    = &s;
        
    //使用log::Reader讀取log記錄:VersionEdit
        log::Reader reader(file, &reporter, true/*checksum*/0/*initial_offset*/);
        Slice record;
        std::
    string scratch;
        
    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
          VersionEdit edit;
          s 
    = edit.DecodeFrom(record);
          
    if (s.ok()) {
            
    if (edit.has_comparator_ &&
                edit.comparator_ 
    != icmp_.user_comparator()->Name()) {
              s 
    = Status::InvalidArgument(
                  edit.comparator_ 
    + "does not match existing comparator ",
                  icmp_.user_comparator()
    ->Name());
            }
          }

          
    if (s.ok()) {
            
    //應用Edit到VersionSet
            builder.Apply(&edit);
          }

          
    if (edit.has_log_number_) {
            log_number 
    = edit.log_number_;
            have_log_number 
    = true;
          }

          
    if (edit.has_prev_log_number_) {
            prev_log_number 
    = edit.prev_log_number_;
            have_prev_log_number 
    = true;
          }

          
    if (edit.has_next_file_number_) {
            next_file 
    = edit.next_file_number_;
            have_next_file 
    = true;
          }

          
    if (edit.has_last_sequence_) {
            last_sequence 
    = edit.last_sequence_;
            have_last_sequence 
    = true;
          }
        }
      }
      delete file;
      file 
    = NULL;

      
    if (s.ok()) {
        
    if (!have_next_file) {
          s 
    = Status::Corruption("no meta-nextfile entry in descriptor");
        } 
    else if (!have_log_number) {
          s 
    = Status::Corruption("no meta-lognumber entry in descriptor");
        } 
    else if (!have_last_sequence) {
          s 
    = Status::Corruption("no last-sequence-number entry in descriptor");
        }

        
    if (!have_prev_log_number) {
          prev_log_number 
    = 0;
        }

        MarkFileNumberUsed(prev_log_number);
        MarkFileNumberUsed(log_number);
      }

      
    if (s.ok()) { //生成新的version,并設為current version
        Version* v = new Version(this);
        builder.SaveTo(v);
        
    // Install recovered version
        Finalize(v);
        AppendVersion(v);
        manifest_file_number_ 
    = next_file;
        next_file_number_ 
    = next_file + 1;
        last_sequence_ 
    = last_sequence;
        log_number_ 
    = log_number;
        prev_log_number_ 
    = prev_log_number;
      }

      
    return s;
    }

    Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
      
    if (edit->has_log_number_) {
        assert(edit
    ->log_number_ >= log_number_);
        assert(edit
    ->log_number_ < next_file_number_);
      } 
    else {
        edit
    ->SetLogNumber(log_number_);
      }

      
    if (!edit->has_prev_log_number_) {
        edit
    ->SetPrevLogNumber(prev_log_number_);
      }

      edit
    ->SetNextFile(next_file_number_);
      edit
    ->SetLastSequence(last_sequence_);

      
    //使用VersionEdit創建新的Version
      Version* v = new Version(this);
      {
        Builder builder(
    this, current_);
        builder.Apply(edit);
        builder.SaveTo(v);
      }
      Finalize(v);

      
    // Initialize new descriptor log file if necessary by creating
      
    // a temporary file that contains a snapshot of the current version.
      std::string new_manifest_file;
      Status s;
      
    //創建新的manifest文件
      if (descriptor_log_ == NULL) {
        
    // No reason to unlock *mu here since we only hit this path in the
        
    // first call to LogAndApply (when opening the database).
        assert(descriptor_file_ == NULL);
        new_manifest_file 
    = DescriptorFileName(dbname_, manifest_file_number_);
        edit
    ->SetNextFile(next_file_number_);
        s 
    = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
        
    if (s.ok()) {
          descriptor_log_ 
    = new log::Writer(descriptor_file_);
          s 
    = WriteSnapshot(descriptor_log_);
        }
      }

      
    // Unlock during expensive MANIFEST log write
      {
        mu
    ->Unlock();

        
    // 寫入manifest log
        if (s.ok()) {
          std::
    string record;
          edit
    ->EncodeTo(&record);
          s 
    = descriptor_log_->AddRecord(record);
          
    if (s.ok()) {
            s 
    = descriptor_file_->Sync();
          }
        }

        
    // If we just created a new descriptor file, install it by writing a
        
    // new CURRENT file that points to it.
        if (s.ok() && !new_manifest_file.empty()) {
          s 
    = SetCurrentFile(env_, dbname_, manifest_file_number_);
        }

        mu
    ->Lock();
      }

      
    // 設置新的version
      if (s.ok()) {
        AppendVersion(v);
        log_number_ 
    = edit->log_number_;
        prev_log_number_ 
    = edit->prev_log_number_;
      } 
    else {
        delete v;
        
    if (!new_manifest_file.empty()) {
          delete descriptor_log_;
          delete descriptor_file_;
          descriptor_log_ 
    = NULL;
          descriptor_file_ 
    = NULL;
          env_
    ->DeleteFile(new_manifest_file);
        }
      }

      
    return s;
    }






    主站蜘蛛池模板: 亚洲日本天堂在线| 久久国产色AV免费看| 亚洲精品亚洲人成人网| 97国产在线公开免费观看| 亚洲欧美日韩一区二区三区在线| 日韩一区二区在线免费观看 | 亚洲午夜激情视频| 99久热只有精品视频免费看| 亚洲AV无码一区二区三区性色| 精品国产亚洲男女在线线电影| 16女性下面扒开无遮挡免费| 国产偷国产偷亚洲高清在线| 亚洲AV无码码潮喷在线观看| 好大好深好猛好爽视频免费| 一个人看的免费视频www在线高清动漫| 久久久久亚洲AV无码麻豆| 四虎影视在线永久免费观看| 84pao国产成视频免费播放| 特级av毛片免费观看| 亚洲国产成人精品无码区在线秒播| 亚洲国产精品自在拍在线播放| 狼群影院在线观看免费观看直播 | 猫咪社区免费资源在线观看| 精品一区二区三区高清免费观看 | 亚洲福利在线观看| 免费一级毛片在级播放| xxxx日本免费| 国产在线精品免费aaa片| 亚洲成a人无码亚洲成www牛牛| 97久久精品亚洲中文字幕无码| 久久久久亚洲AV无码专区桃色| 好吊妞视频免费视频| h视频在线观看免费完整版| a毛片在线看片免费| 污污的视频在线免费观看| 亚洲人精品亚洲人成在线| 亚洲黄色一级毛片| 久久久久亚洲AV成人无码 | 免费夜色污私人影院网站| 亚洲欧美国产欧美色欲| 亚洲国产亚洲片在线观看播放|