<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    莊周夢蝶

    生活、程序、未來
       :: 首頁 ::  ::  :: 聚合  :: 管理

        Kafka這個linkedin開源的MQ,我在過去的blog簡單介紹過。最近3周來,我的工作就是做它的一個Java移植版本,kafka是用scala寫的,基于維護和定制的角度,這個拷貝的版本還是用Java。說拷貝,也不盡然,原理相同,但實現完全換過,從數據結構到通訊框架、通訊協議、程序組織,乃至一些重要功能點上都做了改進和更新。我將這個Java版本取名為metamorphosis,也就是卡夫卡的代表作《變形記》的英文名。

        在原版本上,目前做了如下改進:
    1、協議替換為文本協議,整個協議類似memcached,文本協議的優點自不必說。通訊框架也是采用內部使用的通訊框架,減少工作量。

    2、存儲結構上也采用自定義結構,更簡潔緊湊。

    3、kafka原來只支持consumer和broker之間的服務查找和負載均衡,meta加入了producer和broker之間的服務查找和負載均衡。

    4、Consumer API沒有采用kafka的stream方式,而是同時實現同步獲取和異步訂閱兩種方式,更接近JMS和Notify。

    5、改進了服務器端文件recover的性能,采用并發多線程recover的方式(可選)。

    6、添加了實時統計功能和協議,類似memcached的stats協議,響應透明號召。

    7、客戶端的連接復用。
       
        以后要做的事情,可能包括:
    1、實現類似Mysql的master/slave方案,可能還要分為同步和異步兩種模式。

    2、分區擴展時候的數據自動遷移功能,做到無痛水平擴展。

    3、高可用方案的另一個實現。

    4、嵌入Http server做web管理。
      
        工作在本周初步告一段落,接下來是要做集成測試和壓測等,我在兩臺8核16G的機器上分別部署服務器和客戶端(訂閱者發布者同在一臺),做的一個簡單壓測數據如下:并發100個線程發送5000萬消息并同時消費,1K大小的消息TPS可以達到3.8萬,4K大小的消息TPS可以達到1.8萬,服務器load都維持在一個較低的水平。從這個數據來看,超過我一開始的預期。后續可能做下kakfa的測試對比下。


    posted @ 2011-05-07 10:46 dennis 閱讀(5528) | 評論 (6)編輯 收藏

        無論是消息系統,還是配置管理中心,甚至存儲系統,你都要面臨這樣一個選擇,push模型 or pull模型?是服務端主動給客戶端推送數據,還是客戶端去服務器拉數據,一張圖表對比如下:
     
    push模型 pull模型
    描述 服務端主動發送數據給客戶端 客戶端主動從服務端拉取數據,通常客戶端會定時拉取
    實時性 較好,收到數據后可立即發送給客戶端 一般,取決于pull的間隔時間
    服務端狀態 需要保存push狀態,哪些客戶端已經發送成功,哪些發送失敗 服務端無狀態
     客戶端狀態  無需額外保存狀態 需保存當前拉取的信息的狀態,以便在故障或者重啟的時候恢復
    狀態保存 集中式,集中在服務端 分布式,分散在各個客戶端
    負載均衡 服務端統一處理和控制 客戶端之間做分配,需要協調機制,如使用zookeeper
    其他

    服務端需要做流量控制,無法最大化客戶端的處理能力。

    其次,在客戶端故障情況下,無效的push對服務端有一定負載。

    客戶端的請求可能很多無效或者沒有數據可供傳輸,浪費帶寬和服務器處理能力
    缺點方案 服務器端的狀態存儲是個難點,可以將這些狀態轉移到DB或者key-value存儲,來減輕server壓力。

    針對實時性的問題,可以將push加入進來,push小數據的通知信息,讓客戶端再來主動pull。

    針對無效請求的問題,可以設置逐漸延長間隔時間的策略,以及合理設計協議盡量縮小請求數據包來節省帶寬。



    在面對大量甚至海量客戶端的時候,使用push模型,保存大量的狀態信息是個沉重的負擔,加上復制N份數據分發的壓力,也會使得實時性這唯一的優點也被放小。使用pull模型,通過將客戶端狀態保存在客戶端,大大減輕了服務器端壓力,通過客戶端自身做流量控制也更容易,更能發揮客戶端的處理能力,但是需要面對如何在這些客戶端之間做協調的難題。

    posted @ 2011-04-30 01:06 dennis 閱讀(4540) | 評論 (1)編輯 收藏



        HandlerSocket是一個mysql插件,可以將mysql作為NoSQL來使用,具體可以看我過去寫的這篇Bloghs4j是HandlerSocket的一個java客戶端,自認為它比日本人寫的那個客戶端更實用和易用一些。寫完好久,經過不少朋友使用和測試,現在正式發一個0.1版本,并已同步到maven中心倉庫。

        項目主頁:http://code.google.com/p/hs4j/
        項目描述:hs4j is a practical java client for HandlerSocket,it is nio based and turned to get better performance.
        使用文檔:http://code.google.com/p/hs4j/w/list
        下載地址:http://code.google.com/p/hs4j/downloads/list
        源碼倉庫:https://github.com/killme2008/hs4j

         如果你使用maven2,可以直接引用:
    <dependency>
      
    <groupId>com.googlecode.hs4j</groupId>
      
    <artifactId>hs4j</artifactId>
      
    <version>0.1</version>
    </dependency>

         有疑問和bug請聯系我。

    posted @ 2011-03-29 06:55 dennis 閱讀(3619) | 評論 (3)編輯 收藏

         Xmemcached是一個開源的java memcached client,具有高性能、更易用、功能完善等優點,距離上次發布1.3.1已經超過兩個月,現在正式發布1.3.2這個新版本,主要的改進如下:


    1、Bug修復,從1.3.1版本以來發現的bug并修復,包括:

    issue 112:: 新引入的failure模式在啟動的時候,如果memcached故障,運行不符合預期的bug.

    issue 113: 新增加一個delete方法,可以設置操作超時

    public boolean delete(final String key, long opTimeout)
                
    throws TimeoutException, InterruptedException, MemcachedException;

    2、性能調優,存儲操作(set/add/replace/prepend/append/cas)的性能提升5%。

    3、修復pom.xml,使得xmemcached可以在其他機器上編譯。

    4、使用github作為源碼倉庫,版本管理使用git替換svn,源碼轉移到

          https://github.com/killme2008/xmemcached

    新版本下載地址:

        http://code.google.com/p/xmemcached/downloads/list

    使用maven可以直接引用: 

    <dependency>
          <groupId>com.googlecode.xmemcached</groupId>
          
    <artifactId>xmemcached</artifactId>
          
    <version>1.3.2</version>
     
    </dependency>

    項目文檔:

    http://code.google.com/p/xmemcached/w/list

    posted @ 2011-03-27 14:06 dennis 閱讀(3009) | 評論 (1)編輯 收藏


        在國內,Clojure語言的用戶估計是小眾中的小眾,沒有多少人聽說,也沒有多少人使用,資料也大多數是英文的,討論也只能上國外論壇。因此,我想建立一個CN-Clojure的google group,供大家交流和學習clojure語言。群組地址(需要翻墻):http://groups.google.com/group/cn-clojure

       現在沒人,就我一個。我也會在群組里放些學習資料,歡迎任何對clojure感興趣的朋友加入。

    posted @ 2011-01-28 19:39 dennis 閱讀(3876) | 評論 (5)編輯 收藏

         昨天晚上用clojure搞了個scheme解釋器,基本上是sicp里的解釋器的clojure翻譯版本,可能唯一值的一提的是對transient集合的使用,實現副作用的set!。總共代碼包含注釋才366行,支持的feature包括

    Feature Supported Comment
    define yes
    lambda yes
    variable lookup yes
    primitive procedure evaluation yes
    compound procedure evaluation yes no tail recursion yet
    if yes
    cond yes
    let yes

    let* yes
    no named let* yet
    letrec no
    begin yes

    set! yes

    quote yes
    quasiquote no
    unquote no
    delay no
    define-syntax no

           支持的primitive procedure包括常見的四則運算、car/cdr、list以及display、newline等。代碼放在了github上:https://github.com/killme2008/cscheme,有興趣的可以玩玩吧。

    posted @ 2011-01-24 10:42 dennis 閱讀(3795) | 評論 (0)編輯 收藏


        最近因為空閑時間有一些,所以去看了不少開源項目,大部分東西如果看過不記錄下來,其實還是相當于沒看,所以想想還是有必要摘要記錄一下。

        首先是去了解了zookeeper這個項目,基于paxos算法的分布式服務組件,同事對此有非常深入的研究和介紹,具體可以看我們的團隊Blog。令我感慨的是這么一個非常難以理解的算法,卻用一個簡單的樹狀目錄模型表達出來,并且在這個模型的基礎上衍生出種種應用:集群感知、分布式鎖、分布式隊列、分布式并發原語等等,具體可以看文檔給出的recipes。在實現這些應用的時候,突出強調的是避免網絡風暴,例如分布式鎖的實現,競爭創建子節點,節點序列號最小的獲取鎖,其他節點等待,但是等待在什么條件上是有講究的,如果所有節點都等待最小節點的刪除事件,那么當最小節點釋放鎖的時候,就需要廣播消息給所有其他等待的節點;換一個思路,如果每個等待節點只是等待比它序列號小的節點上,那么就可以避免這種廣播風暴,變成一個順序喚醒的過程。因此盡管有了zookeeper幫助實現分布式這些服務,但是要實現好仍然有一定難度,具體可以參考官方例子。我本來萌生了基于zookeeper實現一套封裝好的類似j.u.c的服務框架,后來在郵件列表發現已經有人搞了這么一個基礎類庫放在github上:https://github.com/openUtility/menagerie 。不過我沒有繼續深入了,有興趣的朋友可以瞧瞧。

        然后又去看了我們淘寶開源的TimeTunnel。TimeTunnel你可以理解成一個消息中間件,它整個設計跟我們的產品相當接近,但是兩者的目的完全不同,tt強調的是高吞吐量,而notify強調的則是可靠性。TT的通訊層直接采用Facebook的thrift,并且利用zookeeper做集群管理和路由。TT的代碼質量很好,有興趣可以拉出來看一下,并且對zookeeper的應用也是一個典型的案例。TT在高可用性上的方案也很有特色,所有的服務器節點形成一個環,兩兩相互主輔備份,一個節點掛了,后續節點仍然可以提供服務直到主節點回來,有點類似一致性哈希的概念。節點的主從關系和順序也是通過zookeeper保證。消息順序的實現是通過稱為router的路由到固定節點做傳輸,router默認是策略不是固定而是RR。TT的數據存儲優先放在內存,并設置了一個內存狀況監視的組件,當發現內存放不下的時候,swap到磁盤文件緩存,實現類似內存換頁的功能。正常情況數據都應該在內存,當然如果可靠級別要求高的話可以先存磁盤再傳輸。TT目前仍然還是比較適合傳輸日志這樣的文本增量數據,并且提供了TailFile這樣的python腳本幫你做這個事情,這個腳本可以通過checkpoint做斷點續傳。在學習這個項目的時候,發現文檔有很大問題,要么錯誤,要么遺漏,并且代碼也不是最新的,我估計開源出來外面的人用的還不太多,希望慢慢能搞的更好一些。

        跟TT類似,另一個追求高吞吐量的MQ是linkedin開源的kafka。Kafka就跟這個名字一樣,設計非常獨特。首先,kafka的開發者們認為不需要在內存里緩存什么數據,操作系統的文件緩存已經足夠完善和強大,只要你不搞隨機寫,順序讀寫的性能是非常高效的。kafka的數據只會順序append,數據的刪除策略是累積到一定程度或者超過一定時間再刪除。Kafka另一個獨特的地方是將消費者信息保存在客戶端而不是MQ服務器,這樣服務器就不用記錄消息的投遞過程,每個客戶端都自己知道自己下一次應該從什么地方什么位置讀取消息,消息的投遞過程也是采用客戶端主動pull的模型,這樣大大減輕了服務器的負擔。Kafka還強調減少數據的序列化和拷貝開銷,它會將一些消息組織成Message Set做批量存儲和發送,并且客戶端在pull數據的時候,盡量以zero-copy的方式傳輸,利用sendfile(對應java里的FileChannel.transferTo/transferFrom)這樣的高級IO函數來減少拷貝開銷。可見,kafka是一個精心設計,特定于某些應用的MQ系統,這種偏向特定領域的MQ系統我估計會越來越多,垂直化的產品策略值的考慮。

         在此期間,我還重新去看了activemq和hornetq的存儲實現,從實現上大家都大同小異,append log + data file的模式。Activemq采用異步隊列寫來提高吞吐量,而Hornetq干脆就直接利用JNI調用原生aio來實現高性能。在搜索Java的aio實現的時候,碰巧發現Mina的沙箱里有個aioj的實現,源碼在:https://svn.apache.org/repos/asf/mina/sandbox/mheath/aioj/ 。我測試了完全可用,也嘗試改造我們的磁盤存儲組件,可惜提升不多,估計不從整個設計上調整服務器,不大可能從aio上獲益。

         最近也重新看起了clojure的一些開源項目,clojure的開源資源在github上也非常豐富,有待挖掘,下次有機會再嘗試介紹一二。
      
       
       

    posted @ 2011-01-20 23:23 dennis 閱讀(7644) | 評論 (11)編輯 收藏

        寫著玩的,不使用任何網絡框架從頭構建的echo server,總共77行。
     1 ;;Author:dennis (killme2008@gmail.com)
     2 (ns webee.network
     3    (:import (java.nio.channels Selector SocketChannel ServerSocketChannel SelectionKey)
     4             (java.net InetSocketAddress)
     5             (java.nio ByteBuffer)
     6             (java.io IOException)))
     7 
     8 (declare reactor process-keys accept-channel read-channel)
     9 
    10 (defn bind [^InetSocketAddress addr fcol]
    11   (let [selector (Selector/open)
    12         ssc      (ServerSocketChannel/open)
    13         ag  (agent selector)]
    14     (do
    15       (.configureBlocking ssc false)
    16       (.. ssc (socket) (bind addr 1000))
    17       (.register ssc selector SelectionKey/OP_ACCEPT)
    18       (send-off ag reactor fcol)
    19       ag)))
    20 
    21 (defn- reactor [^Selector selector fcol]
    22   (let [sel (. selector select 1000)]
    23     (if (> sel 0)
    24       (let [sks (. selector selectedKeys)]
    25         (do 
    26           (dorun (map (partial process-keys selector fcol) sks))
    27           (.clear sks))))
    28     (recur selector fcol)))
    29   
    30 (defn- process-keys [^Selector selector ^SelectionKey fcol sk]
    31   (try
    32     (cond 
    33       (.isAcceptable sk) (accept-channel sk  selector fcol)
    34       (.isReadable sk) (read-channel sk selector fcol)    
    35     )
    36     (catch Throwable e (.printStackTrace e))))
    37 
    38 (defn- accept-channel [^SelectionKey sk ^Selector selector fcol]
    39    (let [^ServerSocketChannel ssc (. sk channel)
    40          ^SocketChannel sc (. ssc accept)
    41          created-fn (:created fcol)]
    42      (do 
    43        (.configureBlocking sc false
    44        (.register sc selector SelectionKey/OP_READ)
    45        (if created-fn
    46          (created-fn sc)))))
    47 
    48 (defn- close-channel [^SelectionKey sk ^SocketChannel sc fcol]
    49   (let [closed-fn (:closed fcol)]
    50     (do 
    51        (.close sc)
    52        (.cancel sk)
    53        (if closed-fn 
    54          (closed-fn sc)))))
    55      
    56 (defn-  read-channel [^SelectionKey sk ^Selector selector fcol]
    57    (let [^SocketChannel sc (. sk channel)
    58          ^ByteBuffer buf (ByteBuffer/allocate 4096)
    59          read-fn (:read fcol)]
    60      (try
    61        (let [n (.read sc buf)]
    62          (if (< n 0)
    63              (close-channel sk sc fcol)
    64              (do (.flip buf)
    65                  (if read-fn
    66                    (read-fn sc buf)))))
    67        (catch IOException e
    68          (close-channel sk sc fcol)))))
    69 
    70 ;;Bind a tcp server to localhost at port 8080,you can telnet it.
    71 (def server
    72   (bind 
    73     (new InetSocketAddress 8080)
    74     {:read #(.write %1 %2)
    75      :created #(println "Accepted from" (.. % (socket) (getRemoteSocketAddress)))
    76      :closed  #(println "Disconnected from" (.. % (socket) (getRemoteSocketAddress)))
    77      }))


    posted @ 2011-01-15 22:56 dennis 閱讀(1895) | 評論 (0)編輯 收藏

          Xmemcached是一個開源的memcached的Java客戶端,最近引入了一些關鍵特性,因此版本號直接從1.2.6.2升級到1.3.0。主要的更改如下:

    1、引入了failure模式,所謂failure模式是指在當一個memcached由于各種原因不可用的情況下,發往這個節點的請求將直接拋出異常,而非使用下一個可用的節點。具體可以看memached的這個文檔。默認不啟用,啟用failure模式很簡單:

    MemcachedClientBuilder builder=……
    //啟用failure模式。
    builder.setFailureMode(true);

    也可以采用spring配置。

    2、在啟用failure模式的情況下,允許為每個memcached設置一個備份節點,當主節點掛掉的情況下,會將請求轉交給備份節點,主節點恢復后又自動切換到主節點。請注意,要設置備份節點的前提是啟用failure模式。假設我們已經有兩個memcached節點:host1:port和host2:port,為host1:port設置一個備份節點host3:port可以實現為:
    MemcachedClientBuilder builder=new XmemcachedClientBuilder(AddrUtil.getAddressMap("host1:port,host3:port host2:port"))
    ……

    主備節點之間用逗號隔開,不同分組之間用空格隔開,完全兼容1.2。并且當備份節點連接意外斷開的情況下,xmemcached也會自動修復備份節點的連接并加入映射。

    關于failure模式和standby節點更多內容可以參考這篇blog.

    3、修正BUG和新功能,包括issue 104,issue 105,issue 107等。

    項目主頁 http://code.google.com/p/xmemcached/

    下載地址 http://code.google.com/p/xmemcached/downloads/list

    用戶指南 http://code.google.com/p/xmemcached/wiki/TableOfContents

         如果你使用maven構建,可以直接引用:

    <dependency>
    <groupId>com.googlecode.xmemcached</groupId>
    <artifactId>xmemcached</artifactId>
    <version>1.3.1</version>
    </dependency>


        更新:發布1.3.1了,如果你還在使用1.3.0,建議升級。1.3.0因為改變了memcached地址服務器順序,可能導致原有的緩存失效。


    posted @ 2011-01-04 20:10 dennis 閱讀(2926) | 評論 (0)編輯 收藏


        首先,還是利用下這個小工具,查看下我10年讀過的書,看過的電影




        讀書:讀的太少,也可以看到,技術方面的更少,如果要說推薦,我只會推薦《Programming Clojure》作為學習clojure的入門,并且推薦《構建高性能web站點》作為了解一個網站構建的方放面面的入門書。

        電影:今年進電影院的次數也寥寥無幾,主要還是重看了萊昂納多的作品,《盜夢空間》很驚艷,《鋼鐵俠2》很失望。

         去年的愿望:讀完《算法導論》——2/3,繼續深入Erlang,探索Erlang在工作中的實際應用——幾乎沒有,加強對其他系統的了解以及大型網站構建方面的學習——小小一些了解,希望能全家一起去旅游一次,希望能將老爸老媽接過來玩一段時間——沒有做到。
      
         工作:狀態并不好,還是嘗試努力去做了一些事情,包括參與一些分享,更多參與他人的代碼復查和設計審查等。抱怨、牢騷少了一些,相對淡定了。

         2011年:還是不談大的愿望,從以往的經驗來說,很難靠譜。也許有一個相對明晰的目標:提高自制力和計劃性。


    posted @ 2011-01-01 09:08 dennis 閱讀(2099) | 評論 (3)編輯 收藏

    僅列出標題
    共56頁: First 上一頁 3 4 5 6 7 8 9 10 11 下一頁 Last 
    主站蜘蛛池模板: 亚洲大成色www永久网站| 亚洲成a人片在线不卡一二三区| 7m凹凸精品分类大全免费| 亚洲第一页在线播放| 女人18一级毛片免费观看| 一级全免费视频播放| 亚洲视频国产精品| 波多野结衣免费视频观看| a视频在线观看免费| 亚洲综合久久精品无码色欲 | 2021精品国产品免费观看| 亚洲欧美成人一区二区三区| 亚洲熟妇中文字幕五十中出| 国国内清清草原免费视频99| 黄色网址免费在线观看| 2020年亚洲天天爽天天噜| 久久青青草原亚洲av无码| 欧美日韩国产免费一区二区三区| 青青操视频在线免费观看| 亚洲色少妇熟女11p| 亚洲成色WWW久久网站| 全免费a级毛片免费看无码| 两个人看的www免费| 亚洲AV永久无码精品一福利| 亚洲人成电影福利在线播放| 大胆亚洲人体视频| 和日本免费不卡在线v| 四虎影视无码永久免费| 在线观看亚洲免费视频| 亚洲videosbestsex日本| 亚洲欧洲日产国码无码久久99 | 亚洲视频免费观看| 亚洲伊人久久综合中文成人网| 最近最好的中文字幕2019免费| 久久久久国产精品免费免费不卡| 欧洲精品码一区二区三区免费看| 亚洲一区二区三区丝袜| 337p日本欧洲亚洲大胆色噜噜| 亚洲老妈激情一区二区三区| 免费少妇a级毛片| 日本免费观看网站|