<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    posts - 495,comments - 227,trackbacks - 0
    http://www.cnblogs.com/phinecos/archive/2012/02/29/2372682.html

         上一篇中介紹了連接Zookeeper集群的方法,這一篇將圍繞一個有趣的話題---來展開,這就是Replication(索引復制),關于Solr Replication的詳細介紹,可以參考http://wiki.apache.org/solr/SolrReplication

             在開始這個話題之前,先從我最近在應用中引入solrmaster/slave架構時,遇到的一個讓我困擾的實際問題。

    應用場景簡單描述如下:

    1)首先master節點下載索引分片,然后創建配置文件,加入master節點的replication配置片段,再對索引分片進行合并(關于mergeIndex,可以參考http://wiki.apache.org/solr/MergingSolrIndexes),然后利用上述配置文件和索引數據去創建一個solr核。

    2slave節點創建配置文件,加入slave節點的replication配置片段,創建一個空的solr核,等待從master節點進行索引數據同步

    出現的問題:slave節點沒有從master節點同步到數據。

    問題分析:

    1)首先檢查master節點,獲取最新的可復制索引的版本號,

    http://master_host:port/solr/replication?command=indexversion

    發現返回的索引版本號是0,這說明mater節點根本沒有觸發replication動作,

    2)為了確認上述判斷,在slave節點上進一步查看replication的詳細信息

    http://slave_host:port/solr/replication?command=details

    發現確實如此,盡管master節點的索引版本號和slave節點的索引版本號不一致,但索引卻沒有同步過來,再分別查看master節點和slave節點的日志,發現索引復制動作確實沒有開始。

    綜上所述,確實是master節點沒有觸發索引復制動作,那究竟是為何呢?先將原因擺出來,后面會通過源碼的分析來加以說明。

    原因:solr合并索引時,不管你是通過mergeindexeshttp命令,還是調用底層luceneIndexWriter,記得最后一定要提交一個commit,否則,不僅索引不僅不會對查詢可見,更是對于master/slave架構的solr集群來說,master節點的replication動作不會觸發,因為indexversion沒有感知到變化。

             好了,下面開始對SolrReplication的分析

             Solr容器在加載solr核的時候,會對已經注冊的各個實現SolrCoreAware接口的Handler進行回調,調用其inform方法。

             對于ReplicationHandler來說,就是在這里對自己是屬于master節點還是slave節點進行判斷,若是slave節點,則創建一個SnapPuller對象,定時負責從master節點主動拉索引數據下來;若是master節點,則只設置相應的參數。

    復制代碼
      public void inform(SolrCore core) {
        this.core = core;
        registerFileStreamResponseWriter();
        registerCloseHook();
        NamedList slave = (NamedList) initArgs.get("slave");
        boolean enableSlave = isEnabled( slave );
        if (enableSlave) {
          tempSnapPuller = snapPuller = new SnapPuller(slave, this, core);
          isSlave = true;
        }
        NamedList master = (NamedList) initArgs.get("master");
        boolean enableMaster = isEnabled( master );
        
        if (!enableSlave && !enableMaster) {
          enableMaster = true;
          master = new NamedList<Object>();
        }
        
        if (enableMaster) {
          includeConfFiles = (String) master.get(CONF_FILES);
          if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {
            List<String> files = Arrays.asList(includeConfFiles.split(","));
            for (String file : files) {
              if (file.trim().length() == 0) continue;
              String[] strs = file.split(":");
              // if there is an alias add it or it is null
              confFileNameAlias.add(strs[0], strs.length > 1 ? strs[1] : null);
            }
            LOG.info("Replication enabled for following config files: " + includeConfFiles);
          }
          List backup = master.getAll("backupAfter");
          boolean backupOnCommit = backup.contains("commit");
          boolean backupOnOptimize = !backupOnCommit && backup.contains("optimize");
          List replicateAfter = master.getAll(REPLICATE_AFTER);
          replicateOnCommit = replicateAfter.contains("commit");
          replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");

          if (!replicateOnCommit && ! replicateOnOptimize) {
            replicateOnCommit = true;
          }
          
          // if we only want to replicate on optimize, we need the deletion policy to
          
    // save the last optimized commit point.
          if (replicateOnOptimize) {
            IndexDeletionPolicyWrapper wrapper = core.getDeletionPolicy();
            IndexDeletionPolicy policy = wrapper == null ? null : wrapper.getWrappedDeletionPolicy();
            if (policy instanceof SolrDeletionPolicy) {
              SolrDeletionPolicy solrPolicy = (SolrDeletionPolicy)policy;
              if (solrPolicy.getMaxOptimizedCommitsToKeep() < 1) {
                solrPolicy.setMaxOptimizedCommitsToKeep(1);
              }
            } else {
              LOG.warn("Replication can't call setMaxOptimizedCommitsToKeep on " + policy);
            }
          }

          if (replicateOnOptimize || backupOnOptimize) {
            core.getUpdateHandler().registerOptimizeCallback(getEventListener(backupOnOptimize, replicateOnOptimize));
          }
          if (replicateOnCommit || backupOnCommit) {
            replicateOnCommit = true;
            core.getUpdateHandler().registerCommitCallback(getEventListener(backupOnCommit, replicateOnCommit));
          }
          if (replicateAfter.contains("startup")) {
            replicateOnStart = true;
            RefCounted<SolrIndexSearcher> s = core.getNewestSearcher(false);
            try {
              DirectoryReader reader = s==null ? null : s.get().getIndexReader();
              if (reader!=null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) {
                try {
                  if(replicateOnOptimize){
                    Collection<IndexCommit> commits = DirectoryReader.listCommits(reader.directory());
                    for (IndexCommit ic : commits) {
                      if(ic.getSegmentCount() == 1){
                        if(indexCommitPoint == null || indexCommitPoint.getGeneration() < ic.getGeneration()) indexCommitPoint = ic;
                      }
                    }
                  } else{
                    indexCommitPoint = reader.getIndexCommit();
                  }
                } finally {
                  // We don't need to save commit points for replication, the SolrDeletionPolicy
                  
    // always saves the last commit point (and the last optimized commit point, if needed)
                  /***
                  if(indexCommitPoint != null){
                    core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());
                  }
                  **
    */
                }
              }

              // reboot the writer on the new index
              core.getUpdateHandler().newIndexWriter();

            } catch (IOException e) {
              LOG.warn("Unable to get IndexCommit on startup", e);
            } finally {
              if (s!=null) s.decref();
            }
          }
          String reserve = (String) master.get(RESERVE);
          if (reserve != null && !reserve.trim().equals("")) {
            reserveCommitDuration = SnapPuller.readInterval(reserve);
          }
          LOG.info("Commits will be reserved for  " + reserveCommitDuration);
          isMaster = true;
        }

    復制代碼

      ReplicationHandler可以響應多種命令:

    1)       indexversion

    這里需要了解的第一個概念是索引提交點(IndexCommit),這是底層lucene的東西,可以自行查閱資料。首先獲取最新的索引提交點,然后從其中獲取索引版本號和索引所屬代。

          IndexCommit commitPoint = indexCommitPoint;  // make a copy so it won't change
          if (commitPoint != null && replicationEnabled.get()) {
            core.getDeletionPolicy().setReserveDuration(commitPoint.getVersion(), reserveCommitDuration);
            rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());

      rsp.add(GENERATION, commitPoint.getGeneration());  

        2backup。這個命令用來對索引做快照。首先獲取最新的索引提交點,然后創建做一個SnapShooter,具體的快照動作由這個對象完成,

    復制代碼
       private void doSnapShoot(SolrParams params, SolrQueryResponse rsp, SolrQueryRequest req) { 

        try {
          int numberToKeep = params.getInt(NUMBER_BACKUPS_TO_KEEP, Integer.MAX_VALUE);
          IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
          IndexCommit indexCommit = delPolicy.getLatestCommit();
          
          if(indexCommit == null) {
            indexCommit = req.getSearcher().getReader().getIndexCommit();
          }
          
          // small race here before the commit point is saved
          new SnapShooter(core, params.get("location")).createSnapAsync(indexCommit, numberToKeep, this);
          
        } catch (Exception e) {
          LOG.warn("Exception during creating a snapshot", e);
          rsp.add("exception", e);
        }
      }

     

    快照對象會啟動一個線程去異步地做一個索引備份。

    void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, final ReplicationHandler replicationHandler) {

        replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getVersion());

     

        new Thread() {

          @Override

          public void run() {

            createSnapshot(indexCommit, numberToKeep, replicationHandler);

          }

        }.start();

     }

     

     void createSnapshot(final IndexCommit indexCommit, int numberToKeep, ReplicationHandler replicationHandler) {

        NamedList details = new NamedList();

        details.add("startTime", new Date().toString());

        File snapShotDir = null;

        String directoryName = null;

        Lock lock = null;

        try {

          if(numberToKeep<Integer.MAX_VALUE) {

            deleteOldBackups(numberToKeep);

          }

          SimpleDateFormat fmt = new SimpleDateFormat(DATE_FMT, Locale.US);

          directoryName = "snapshot." + fmt.format(new Date());

          lock = lockFactory.makeLock(directoryName + ".lock");

          if (lock.isLocked()) return;

          snapShotDir = new File(snapDir, directoryName);

          if (!snapShotDir.mkdir()) {

            LOG.warn("Unable to create snapshot directory: " + snapShotDir.getAbsolutePath());

            return;

          }

          Collection<String> files = indexCommit.getFileNames();

          FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(), indexCommit);

          fileCopier.copyFiles(files, snapShotDir);

     

          details.add("fileCount", files.size());

          details.add("status", "success");

          details.add("snapshotCompletedAt", new Date().toString());

        } catch (Exception e) {

          SnapPuller.delTree(snapShotDir);

          LOG.error("Exception while creating snapshot", e);

          details.add("snapShootException", e.getMessage());

        } finally {

          replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getVersion());  

          replicationHandler.snapShootDetails = details;

          if (lock != null) {

            try {

              lock.release();

            } catch (IOException e) {

              LOG.error("Unable to release snapshoot lock: " + directoryName + ".lock");

            }

          }

        }

      }

    3fetchindex。響應來自slave節點的取索引文件的請求,會啟動一個線程來實現索引文件的獲取。

          String masterUrl = solrParams.get(MASTER_URL);

          if (!isSlave && masterUrl == null) {

            rsp.add(STATUS,ERR_STATUS);

            rsp.add("message","No slave configured or no 'masterUrl' Specified");

            return;

          }

          final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);

          new Thread() {

            @Override

            public void run() {

              doFetch(paramsCopy);

            }

          }.start();

          rsp.add(STATUS, OK_STATUS);

    具體的獲取動作是通過SnapPuller對象來實現的,首先嘗試獲取pull對象鎖,如果請求鎖失敗,則說明還有取索引數據動作未結束,如果請求鎖成功,就調用SnapPuller對象的fetchLatestIndex方法來取最新的索引數據。

     void doFetch(SolrParams solrParams) {

        String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);

        if (!snapPullLock.tryLock())

          return;

        try {

          tempSnapPuller = snapPuller;

          if (masterUrl != null) {

            NamedList<Object> nl = solrParams.toNamedList();

            nl.remove(SnapPuller.POLL_INTERVAL);

            tempSnapPuller = new SnapPuller(nl, this, core);

          }

          tempSnapPuller.fetchLatestIndex(core);

        } catch (Exception e) {

          LOG.error("SnapPull failed ", e);

        } finally {

          tempSnapPuller = snapPuller;

          snapPullLock.unlock();

        }

     }

    最后真正的取索引數據過程,首先,若mastet節點的indexversion0,則說明master節點根本沒有提供可供復制的索引數據,若master節點和slave節點的indexversion相同,則說明slave節點目前與master節點索引數據狀態保持一致,無需同步。若兩者的indexversion不同,則開始索引復制過程,首先從master節點上下載指定索引版本號的索引文件列表,然后創建一個索引文件同步服務線程來完成同并工作。

    這里需要區分的是,如果master節點的年代比slave節點要老,那就說明兩者已經不相容,此時slave節點需要新建一個索引目錄,再從master節點做一次全量索引復制。還需要注意的一點是,索引同步也是可以同步配置文件的,若配置文件發生變化,則需要對solr核進行一次reload操作。最對了,還有,和文章開頭一樣, slave節點同步完數據后,別忘了做一次commit操作,以便刷新自己的索引提交點到最新的狀態。最后,關閉并等待同步服務線程結束。此外,具體的取索引文件是通過FileFetcher對象來完成。

     boolean fetchLatestIndex(SolrCore core) throws IOException {

        replicationStartTime = System.currentTimeMillis();

        try {

          //get the current 'replicateable' index version in the master

          NamedList response = null;

          try {

            response = getLatestVersion();

          } catch (Exception e) {

            LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());

            return false;

          }

          long latestVersion = (Long) response.get(CMD_INDEX_VERSION);

          long latestGeneration = (Long) response.get(GENERATION);

          if (latestVersion == 0L) {

            //there is nothing to be replicated

            return false;

          }

          IndexCommit commit;

          RefCounted<SolrIndexSearcher> searcherRefCounted = null;

          try {

            searcherRefCounted = core.getNewestSearcher(false);

            commit = searcherRefCounted.get().getReader().getIndexCommit();

          } finally {

            if (searcherRefCounted != null)

              searcherRefCounted.decref();

          }

          if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {

            //master and slave are alsready in sync just return

            LOG.info("Slave in sync with master.");

            return false;

          }

          LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);

          LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());

          LOG.info("Starting replication process");

          // get the list of files first

          fetchFileList(latestVersion);

          // this can happen if the commit point is deleted before we fetch the file list.

          if(filesToDownload.isEmpty()) return false;

          LOG.info("Number of files in latest index in master: " + filesToDownload.size());

     

          // Create the sync service

          fsyncService = Executors.newSingleThreadExecutor();

          // use a synchronized list because the list is read by other threads (to show details)

          filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());

          // if the generateion of master is older than that of the slave , it means they are not compatible to be copied

          // then a new index direcory to be created and all the files need to be copied

          boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;

          File tmpIndexDir = createTempindexDir(core);

          if (isIndexStale())

            isFullCopyNeeded = true;

          successfulInstall = false;

          boolean deleteTmpIdxDir = true;

          File indexDir = null ;

          try {

            indexDir = new File(core.getIndexDir());

            downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestVersion);

            LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");

            Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);

            if (!modifiedConfFiles.isEmpty()) {

              downloadConfFiles(confFilesToDownload, latestVersion);

              if (isFullCopyNeeded) {

                successfulInstall = modifyIndexProps(tmpIndexDir.getName());

                deleteTmpIdxDir = false;

              } else {

                successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);

              }

              if (successfulInstall) {

                LOG.info("Configuration files are modified, core will be reloaded");

                logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files.

                reloadCore();

              }

            } else {

              terminateAndWaitFsyncService();

              if (isFullCopyNeeded) {

                successfulInstall = modifyIndexProps(tmpIndexDir.getName());

                deleteTmpIdxDir = false;

              } else {

                successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);

              }

              if (successfulInstall) {

                logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);

                doCommit();

              }

            }

            replicationStartTime = 0;

            return successfulInstall;

          } catch (ReplicationHandlerException e) {

            LOG.error("User aborted Replication");

          } catch (SolrException e) {

            throw e;

          } catch (Exception e) {

            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);

          } finally {

            if (deleteTmpIdxDir) delTree(tmpIndexDir);

            else delTree(indexDir);

          }

          return successfulInstall;

        } finally {

          if (!successfulInstall) {

            logReplicationTimeAndConfFiles(null, successfulInstall);

          }

          filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;

          replicationStartTime = 0;

          fileFetcher = null;

          if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdownNow();

          fsyncService = null;

          stop = false;

          fsyncException = null;

        }

     }

    posted on 2012-07-04 18:42 SIMONE 閱讀(1990) 評論(0)  編輯  收藏 所屬分類: solr
    主站蜘蛛池模板: 一级一看免费完整版毛片| 久久中文字幕免费视频| 亚洲熟妇少妇任你躁在线观看无码 | 中国国语毛片免费观看视频| 亚洲av伊人久久综合密臀性色| 91免费福利精品国产| 久久亚洲中文字幕无码| 亚洲精品自在在线观看| 很黄很色很刺激的视频免费| 鲁啊鲁在线视频免费播放| 久久精品国产96精品亚洲 | 国产午夜亚洲精品不卡免下载| 国产亚洲日韩一区二区三区| 无码国产精品一区二区免费| 极品美女一级毛片免费| 亚洲无人区视频大全| 亚洲一区二区三区国产精品| 美丽姑娘免费观看在线观看中文版| 亚洲国产欧美日韩精品一区二区三区| 亚洲色偷偷偷鲁综合| 最新69国产成人精品免费视频动漫| 香蕉免费在线视频| 亚洲视频在线观看2018| 亚洲av永久无码制服河南实里| 四虎成人免费影院网址| 无码专区AAAAAA免费视频| 黄色毛片免费观看| 亚洲看片无码在线视频| 亚洲AV综合色区无码一区 | 亚洲黄色网址大全| 亚洲综合区小说区激情区| 在线观看无码AV网站永久免费| 日韩精品无码免费专区午夜| 精品久久久久亚洲| 亚洲一级片在线观看| 亚洲AV无码一区东京热久久| 亚洲国产一区视频| 免费看www视频| 亚洲人成网站免费播放| 99精品热线在线观看免费视频 | 日韩精品免费电影|