概要

這個類在 Oracle 的官方文檔里是查不到的,但是確實在 OpenJDK 的源代碼里出現了,Arrays 中的 sort 函數用到了這個用于排序的類。它將歸并排序(merge sort) 與插入排序(insertion sort) 結合,并進行了一些優化。對于已經部分排序的數組,時間復雜度遠低于 O(n log(n)),最好可達 O(n),對于隨機排序的數組,時間復雜度為 O(nlog(n)),平均時間復雜度 O(nlog(n))。強烈建議在看此文前觀看 Youtube 上的 可視化Timsort,看完后馬上就會對算法的執行過程有一個感性的了解。然后,可以閱讀 Wikipeida 詞條:Timsort。 這個排序算法在 Java SE 7, Android, GNU Octave 中都得到了應用。另外,文 后也推薦了兩篇非常好的文章,如果想搞明白 TimSort 最好閱讀一下。

此類是對 Python 中,由 Tim Peters 實現的排序算法的改寫。實現來自:listobject.c.

原始論文來自:

"Optimistic Sorting and Information Theoretic Complexity" Peter  McIlroy SODA (Fourth Annual ACM-SIAM Symposium on Discrete  Algorithms), pp 467-474, Austin, Texas, 25-27 January 1993. 

實現

  • sort
static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c) {     if (c == null) {         Arrays.sort(a, lo, hi);         return;     }      rangeCheck(a.length, lo, hi);     int nRemaining  = hi - lo;     if (nRemaining < 2)         return;  // Arrays of size 0 and 1 are always sorted      // If array is small, do a "mini-TimSort" with no merges     if (nRemaining < MIN_MERGE) {         int initRunLen = countRunAndMakeAscending(a, lo, hi, c);         binarySort(a, lo, hi, lo + initRunLen, c);         return;     }      /**      * March over the array once, left to right, finding natural runs,      * extending short natural runs to minRun elements, and merging runs      * to maintain stack invariant.      */     TimSort<T> ts = new TimSort<>(a, c);     int minRun = minRunLength(nRemaining);     do {         // Identify next run         int runLen = countRunAndMakeAscending(a, lo, hi, c);          // If run is short, extend to min(minRun, nRemaining)         if (runLen < minRun) {             int force = nRemaining <= minRun ? nRemaining : minRun;             binarySort(a, lo, lo + force, lo + runLen, c);             runLen = force;         }          // Push run onto pending-run stack, and maybe merge         ts.pushRun(lo, runLen);         ts.mergeCollapse();          // Advance to find next run         lo += runLen;         nRemaining -= runLen;     } while (nRemaining != 0);      // Merge all remaining runs to complete sort     assert lo == hi;     ts.mergeForceCollapse();     assert ts.stackSize == 1; } 

下面分段解釋:

if (c == null) {     Arrays.sort(a, lo, hi);     return; } 

如果沒有提供 Comparaotr 的話,會調用 Arrays.sort 中的函數,背后其實又會調用 ComparableTimSort,它是對沒有提供Comparator ,但是實現了 Comparable 的元素進行排序,算法和這里的是一樣的,就是元素比較方法不一樣。

后面是算法的主體:

    if (nRemaining < 2)         return;  // Arrays of size 0 and 1 are always sorted      // If array is small, do a "mini-TimSort" with no merges     if (nRemaining < MIN_MERGE) {         int initRunLen = countRunAndMakeAscending(a, lo, hi, c);         binarySort(a, lo, hi, lo + initRunLen, c);         return;     }  
  1. 如果元素個數小于2,直接返回,因為這兩個元素已經排序了
  2. 如果元素個數小于一個閾值(默認為),調用 binarySort,這是一個不包含合并操作的 mini-TimSort。
  3. 在關鍵的 do-while 循環中,不斷地進行排序,合并,排序,合并,一直到所有數據都處理完。
    TimSort<T> ts = new TimSort<>(a, c);     int minRun = minRunLength(nRemaining);     do {          ...      } while (nRemaining != 0); 
  • minRunLength

這個函數會找出 run 的最小長度,少于這個長度就需要對其進行擴展。

static int minRunLength(int n) {         assert n >= 0;         int r = 0;      // Becomes 1 if any 1 bits are shifted off         while (n >= MIN_MERGE) {             r |= (n & 1);             n >>= 1;         }         return n + r;     } 

先看看 n 與 minRunLength(n) 對應關系

0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10 10 11 11 12 12 13 13 14 14 15 15 16 16 17 17 18 18 19 19 20 20 21 21 22 22 23 23 24 24 25 25 26 26 27 27 28 28 29 29 30 30 31 31 32 16 33 17 34 17 35 18 36 18 37 19 38 19 39 20 40 20 41 21 42 21 43 22 44 22 45 23 46 23 47 24 48 24 49 25 50 25 51 26 52 26 53 27 54 27 55 28 56 28 57 29 58 29 59 30 60 30 61 31 62 31 63 32 64 16 65 17 66 17 67 17 68 17 69 18 70 18 71 18 72 18 73 19 74 19 75 19 76 19 77 20 78 20 79 20 80 20 81 21 82 21 83 21 84 21 85 22 86 22 87 22 88 22 89 23 90 23 91 23 92 23 93 24 94 24 95 24 96 24 97 25 98 25 99 25  ... 

看這個估計可以猜出來函數的功能了,下面解釋一下。

這個函數根據 n 計算出對應的 natural run 的最小長度。MIN_MERGE 默認為 32,如果n小于此值,那么返回 n 本身。否則會將 n 不斷地右移,直到少于 MIN_MERGE,同時記錄一個 r 值,r 代表最后一次移位n時,n最低位是0還是1。 最后返回 n + r,這也意味著只保留最高的 5 位,再加上第六位。

  • do-while

我們再看看 do-while 中發生了什么。

   TimSort<T> ts = new TimSort<>(a, c);     int minRun = minRunLength(nRemaining);     do {         // Identify next run         int runLen = countRunAndMakeAscending(a, lo, hi, c);          // If run is short, extend to min(minRun, nRemaining)         if (runLen < minRun) {             int force = nRemaining <= minRun ? nRemaining : minRun;             binarySort(a, lo, lo + force, lo + runLen, c);             runLen = force;         }          // Push run onto pending-run stack, and maybe merge         ts.pushRun(lo, runLen);         ts.mergeCollapse();          // Advance to find next run         lo += runLen;         nRemaining -= runLen;     } while (nRemaining != 0); 

countRunAndMakeAscending 會找到一個 run ,這個 run 必須是已經排序的,并且函數會保證它為升序,也就是說,如果找到的是一個降序的,會對其進行翻轉。

簡單看一眼這個函數:

  • countRunAndMakeAscending
private static <T> int countRunAndMakeAscending(T[] a, int lo, int hi,                                                 Comparator<? super T> c) {     assert lo < hi;     int runHi = lo + 1;     if (runHi == hi)         return 1;      // Find end of run, and reverse range if descending     if (c.compare(a[runHi++], a[lo]) < 0) { // Descending         while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) < 0)             runHi++;         reverseRange(a, lo, runHi);     } else {                              // Ascending         while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) >= 0)             runHi++;     }      return runHi - lo; } 

注意其中的 reverseRange 就是我們說的翻轉。

現在,有必要看一下 binarySort 了。

private static <T> void binarySort(T[] a, int lo, int hi, int start,                                    Comparator<? super T> c) {     assert lo <= start && start <= hi;     if (start == lo)         start++;     for ( ; start < hi; start++) {         T pivot = a[start];          // Set left (and right) to the index where a[start] (pivot) belongs         int left = lo;         int right = start;         assert left <= right;         /*          * Invariants:          *   pivot >= all in [lo, left).          *   pivot <  all in [right, start).          */         while (left < right) {             int mid = (left + right) >>> 1;             if (c.compare(pivot, a[mid]) < 0)                 right = mid;             else                 left = mid + 1;         }         assert left == right;          /*          * The invariants still hold: pivot >= all in [lo, left) and          * pivot < all in [left, start), so pivot belongs at left.  Note          * that if there are elements equal to pivot, left points to the          * first slot after them -- that's why this sort is stable.          * Slide elements over to make room for pivot.          */         int n = start - left;  // The number of elements to move         // Switch is just an optimization for arraycopy in default case         switch (n) {             case 2:  a[left + 2] = a[left + 1];             case 1:  a[left + 1] = a[left];                      break;             default: System.arraycopy(a, left, a, left + 1, n);         }         a[left] = pivot;     } } 

我們都聽說過 binarySearch ,但是這個 binarySort 又是什么呢? binarySort 對數組 a[lo:hi] 進行排序,并且a[lo:start] 是已經排好序的。算法的思路是對 a[start:hi] 中的元素,每次使用 binarySearch 為它在 a[lo:start] 中找到相應位置,并插入。

回到 do-while 循環中,看看 binarySearch 的作用:

  // If run is short, extend to min(minRun, nRemaining)     if (runLen < minRun) {         int force = nRemaining <= minRun ? nRemaining : minRun;         binarySort(a, lo, lo + force, lo + runLen, c);         runLen = force;     } 

所以,我們明白了,binarySort 對 run 進行了擴展,并且擴展后,run 仍然是有序的。

隨后:

   // Push run onto pending-run stack, and maybe merge     ts.pushRun(lo, runLen);     ts.mergeCollapse();      // Advance to find next run     lo += runLen;     nRemaining -= runLen; 

當前的 run 位于 a[lo:runLen] ,將其入棧,然后將棧中的 run 合并。

  • pushRun
private void pushRun(int runBase, int runLen) {     this.runBase[stackSize] = runBase;     this.runLen[stackSize] = runLen;     stackSize++; } 

入棧過程簡單明了,不解釋。

再看另一個關鍵函數,合并操作。如果你看過文章開頭提到的對 Timsort 進行可視化的視頻,一定會對合并操作印象深刻。它會把已經排序的 run 合并成一個大 run,此大 run 也會排好序。

/**  * Examines the stack of runs waiting to be merged and merges adjacent runs  * until the stack invariants are reestablished:  *  *     1. runLen[i - 3] > runLen[i - 2] + runLen[i - 1]  *     2. runLen[i - 2] > runLen[i - 1]  *  * This method is called each time a new run is pushed onto the stack,  * so the invariants are guaranteed to hold for i < stackSize upon  * entry to the method.  */ private void mergeCollapse() {     while (stackSize > 1) {         int n = stackSize - 2;         if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {             if (runLen[n - 1] < runLen[n + 1])                 n--;             mergeAt(n);         } else if (runLen[n] <= runLen[n + 1]) {             mergeAt(n);         } else {             break; // Invariant is established         }     } } 

合并的過程會一直循環下去,一直到注釋里提到的循環不變式得到滿足。

  • mergeAt

mergeAt 會把棧頂的兩個 run 合并起來:

   /**      * Merges the two runs at stack indices i and i+1.  Run i must be      * the penultimate or antepenultimate run on the stack.  In other words,      * i must be equal to stackSize-2 or stackSize-3.      *      * @param i stack index of the first of the two runs to merge  */ private void mergeAt(int i) {     assert stackSize >= 2;     assert i >= 0;     assert i == stackSize - 2 || i == stackSize - 3;      int base1 = runBase[i];     int len1 = runLen[i];     int base2 = runBase[i + 1];     int len2 = runLen[i + 1];     assert len1 > 0 && len2 > 0;     assert base1 + len1 == base2;      /*      * Record the length of the combined runs; if i is the 3rd-last      * run now, also slide over the last run (which isn't involved      * in this merge).  The current run (i+1) goes away in any case.      */     runLen[i] = len1 + len2;     if (i == stackSize - 3) {         runBase[i + 1] = runBase[i + 2];         runLen[i + 1] = runLen[i + 2];     }     stackSize--;      /*      * Find where the first element of run2 goes in run1. Prior elements      * in run1 can be ignored (because they're already in place).      */     int k = gallopRight(a[base2], a, base1, len1, 0, c);     assert k >= 0;     base1 += k;     len1 -= k;     if (len1 == 0)         return;      /*      * Find where the last element of run1 goes in run2. Subsequent elements      * in run2 can be ignored (because they're already in place).      */     len2 = gallopLeft(a[base1 + len1 - 1], a, base2, len2, len2 - 1, c);     assert len2 >= 0;     if (len2 == 0)         return;      // Merge remaining runs, using tmp array with min(len1, len2) elements     if (len1 <= len2)         mergeLo(base1, len1, base2, len2);     else         mergeHi(base1, len1, base2, len2); } 

由于要合并的兩個 run 是已經排序的,所以合并的時候,有會特別的技巧。假設兩個 run 是 run1,run2 ,先用 gallopRight在 run1 里使用 binarySearch 查找 run2 首元素 的位置 k, 那么 run1 中 k 前面的元素就是合并后最小的那些元素。然后,在 run2 中查找 run1 尾元素 的位置 len2 ,那么 run2 中 len2 后面的那些元素就是合并后最大的那些元素。最后,根據len1 與 len2 大小,調用 mergeLo 或者 mergeHi 將剩余元素合并。

gallop 和 merge 就不展開了。

另外,強烈推薦閱讀文后的兩篇文章,第一篇可以看到 JDK7 中更換排序算法后可能引發的問題,另外,也會介紹源代碼,并給出具體的例子。第二篇會告訴你如何對一個 MergeSort 進行優化,介紹了 TimSort 背后的思想。

推薦閱讀

1