??xml version="1.0" encoding="utf-8" standalone="yes"?>亚洲宅男精品一区在线观看,久久亚洲国产视频,亚洲国产精品婷婷久久http://www.tkk7.com/DLevin/In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatrazh-cnThu, 27 Mar 2025 10:42:20 GMTThu, 27 Mar 2025 10:42:20 GMT60使用NamedParameterJdbcTemplate遇到无法使用的坑http://www.tkk7.com/DLevin/archive/2015/11/11/428149.htmlDLevinDLevinWed, 11 Nov 2015 10:46:00 GMThttp://www.tkk7.com/DLevin/archive/2015/11/11/428149.htmlhttp://www.tkk7.com/DLevin/comments/428149.htmlhttp://www.tkk7.com/DLevin/archive/2015/11/11/428149.html#Feedback0http://www.tkk7.com/DLevin/comments/commentRss/428149.htmlhttp://www.tkk7.com/DLevin/services/trackbacks/428149.html最q一直在捣鼓HBase的项目,之前写了一些代码从数据库加载数据到HBaseQ所有的代码都跑得好好地Q然而今天尝试着换了一个数据库Q就跑不通了。通过数据工具Q可以发现连接没有问题,而且有部分逻辑很顺利通过了,然而有一些就是卡MQ通过jstack打印出来的信息可以找到这L堆栈Q?/span>
"runner{object-loader#292}-objecthandler" #323 prio=5 os_prio=0 tid=0x00002aaadc5ec800 nid=0x7f62 in Object.wait() [0x0000000056ce4000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1104)
        - locked <0x00000007736013e8> (a org.apache.commons.pool.impl.GenericObjectPool$Latch)
        at org.apache.commons.dbcp.PoolingDataSource.getConnection(PoolingDataSource.java:106)
        at org.apache.commons.dbcp.BasicDataSource.getConnection(BasicDataSource.java:1044)
        at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:111)
        at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:77)
所以开始我怀疑是q接的问题,从网上也扑ֈ了一个类型的现象Q有人怀疑是DBPC的一个bugD死锁Qhttp://stackoverflow.com/questions/5714511/deadlock-issue-in-dbcp-deployed-on-tomcatQ所以我升了DBCP版本1.4Q然而和qh一Ll果Q升UDBCP版本q没有解决问题。简单的看DBCP的代码,都开始怀疑是不是因ؓ没有Spring JdbcTemplate没有正确的把Connectionq回回去引v泄漏了,然而也有点感觉不太可能Q因D代码在其他数据库都跑得好好圎ͼ但是我们的数据库版本都是一致的Q然而其他配|上也被假设一致了Q被忽略的一个重要的点)?br />
后来开始调配置Q减连接数Q减线E数Q经q各U组合,发现当把DBȝbatch降到1的时候就可以work了,非常诡异的一个问题。从数据工具中查刎ͼ如果用batchQ得到的SQL?
SELECT <column>, <column> FROM <table> where iid in (@p0, @p1)
如果是batch?的话Q?br />
SELECT <column>, <column> FROM <table> where iid in (@p0)
q段SQL语句是这么生的Q?br />
DataSource dataSource = ....
this.jdbc = new NamedParameterJdbcTemplate(dataSource);
...
MapSqlParameterSource parameters = new MapSqlParameterSource();
parameters.addValue("params", paramsMap.keySet());
jdbc.query("SELECT <columns> FROM <table> where <column> in (:params)";, parameters, new ResultSetExtractor<Void>() {
....
})

如果是一个batch的话Q在jstack堆栈中可以看到它一直在{数据库的返回结果:
"runner{object-loader#16}-objecthandler" #47 prio=5 os_prio=0 tid=0x0000000006ddd800 nid=0x2694 runnable [0x0000000045434000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:170)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at com.sybase.jdbc3.timedio.RawDbio.reallyRead(Unknown Source)
        at com.sybase.jdbc3.timedio.Dbio.doRead(Unknown Source)
        at com.sybase.jdbc3.timedio.InStreamMgr.a(Unknown Source)
        at com.sybase.jdbc3.timedio.InStreamMgr.doRead(Unknown Source)
        at com.sybase.jdbc3.tds.TdsProtocolContext.getChunk(Unknown Source)
q也解释了第一个堆栈一直停在borrowObject(getConnection)的阶D,因ؓ之前所有的Connection都在数据库堵住没有返回,所以这个线E再拿Connection的时候超q了我设|的最大Connection敎ͼ所以就{着拿不到Connection?br />
在后来查了一下不同数据库的JDBC Driver信息(sp_version):
jConnect (TM) for JDBC(TM)/7.07 ESD #4 (Build 26793)/P/EBF20302/JDK 1.6.0/jdbcmain/OPT/Thu Jul  5 22:08:44 PDT 2012
jConnect (TM) for JDBC(TM)/1000/Wed Mar 11 05:01:24 2015 PDT

也就是说q种用法是因为旧的JDBC Driver对NamedParameterJdbcTemplate不完善引LQ这个坑׃我一整天的时间。。。?img src ="http://www.tkk7.com/DLevin/aggbug/428149.html" width = "1" height = "1" />

DLevin 2015-11-11 18:46 发表评论
]]>
SSTable详解http://www.tkk7.com/DLevin/archive/2015/09/25/427481.htmlDLevinDLevinThu, 24 Sep 2015 17:35:00 GMThttp://www.tkk7.com/DLevin/archive/2015/09/25/427481.htmlhttp://www.tkk7.com/DLevin/comments/427481.htmlhttp://www.tkk7.com/DLevin/archive/2015/09/25/427481.html#Feedback0http://www.tkk7.com/DLevin/comments/commentRss/427481.htmlhttp://www.tkk7.com/DLevin/services/trackbacks/427481.html 前记几年前在读Google的BigTable论文的时候,当时q没有理解论文里面表辄思想Q因而囫囵吞枣,q没有注意到SSTable的概c再后来开始关注HBase的设计和源码后,开始对BigTable传递的思想慢慢的清晰v来,但是因ؓ事情太多Q没有安排出旉重读BigTable的论文。在目里,我因己在学HBaseQ开始主推HBaseQ而另一个同事则因ؓ对Cassandra比较感冒Q因而他主要xCassandra的设计,不过我们两个人偶都会讨Z下技术、设计的各种观点和心得,然后他偶然的说了一句:Cassandra和HBase都采用SSTable格式存储Q然后我本能的问了一句:什么是SSTableQ他q没有回{,可能也不是那么几句能说清楚的Q或者他自己也没有尝试的去问q自p个问题。然而这个问题本w却一直困扰着我,因而趁着现在有一些时间深入学习HBase和Cassandra相关设计的时候先把这个问题弄清楚了?br />

SSTable的定?/h2>要解释这个术语的真正含义Q最好的Ҏ是从它的出处找{案Q所以重新翻开BigTable的论文。在q篇论文中,最初对SSTable是这么描q的Q第三页末和W四初Q:
SSTable

The Google SSTable file format is used internally to store Bigtable data. An SSTable provides a persistent, ordered immutable map from keys to values, where both keys and values are arbitrary byte strings. Operations are provided to look up the value associated with a specified key, and to iterate over all key/value pairs in a specified key range. Internally, each SSTable contains a sequence of blocks (typically each block is 64KB in size, but this is configurable). A block index (stored at the end of the SSTable) is used to locate blocks; the index is loaded into memory when the SSTable is opened. A lookup can be performed with a single disk seek: we first find the appropriate block by performing a binary search in the in-memory index, and then reading the appropriate block from disk. Optionally, an SSTable can be completely mapped into memory, which allows us to perform lookups and scans without touching disk.

单的非直译:
SSTable是Bigtable内部用于数据的文件格式,它的格式为文件本w就是一个排序的、不可变的、持久的Key/Value对MapQ其中Key和value都可以是L的byte字符丌Ӏ用Key来查找ValueQ或通过l定Key范围遍历所有的Key/Value寏V每个SSTable包含一pd的BlockQ一般Block大小?4KBQ但是它是可配置的)Q在SSTable的末是Block索引Q用于定位BlockQ这些烦引在SSTable打开时被加蝲到内存中Q在查找旉先从内存中的索引二分查找扑ֈBlockQ然后一ơ磁盘寻道即可读取到相应的Block。还有一U方案是这个SSTable加蝲到内存中Q从而在查找和扫描中不需要读取磁盘?/span>

q个貌似是HFileW一个版本的格式么,贴张图感受一下:

在HBase使用q程中,对这个版本的HFile遇到以下一些问题(参?a >q里Q:
1. 解析时内存用量比较高?br />2. Bloom Filter和Block索引会变的很大,而媄响启动性能。具体的QBloom Filter可以增长?00MB每个HFileQ而Block索引可以增长?00MBQ如果一个HRegionServer中有20个HRegionQ则他们分别能增长到2GB?GB的大。HRegion需要在打开Ӟ需要加载所有的Block索引到内存中Q因而媄响启动性能Q而在W一ơRequestӞ需要将整个Bloom Filter加蝲到内存中Q再开始查找,因而Bloom Filter太大会媄响第一ơ请求的延迟?br />而HFile在版?中对q些问题做了一些优化,具体会在HFile解析时详l说明?br />

SSTable作ؓ存储使用

l箋BigTable的论文往下走Q在5.3 Tablet Serving节中这样写道(W?)Q?br />
Tablet Serving

Updates are committed to a commit log that stores redo records. Of these updates, the recently committed ones are stored in memory in a sorted buffer called a memtable; the older updates are stored in a sequence of SSTables. To recover a tablet, a tablet server reads its metadata from the METADATA table. This metadata contains the list of SSTables that comprise a tablet and a set of a redo points, which are pointers into any commit logs that may contain data for the tablet. The server reads the indices of the SSTables into memory and reconstructs the memtable by applying all of the updates that have committed since the redo points.

When a write operation arrives at a tablet server, the server checks that it is well-formed, and that the sender is authorized to perform the mutation. Authorization is performed by reading the list of permitted writers from a Chubby file (which is almost always a hit in the Chubby client cache). A valid mutation is written to the commit log. Group commit is used to improve the throughput of lots of small mutations [13, 16]. After the write has been committed, its contents are inserted into the memtable.

When a read operation arrives at a tablet server, it is similarly checked for well-formedness and proper authorization. A valid read operation is executed on a merged view of the sequence of SSTables and the memtable. Since the SSTables and the memtable are lexicographically sorted data structures, the merged view can be formed efficiently.

Incoming read and write operations can continue while tablets are split and merged.

W一D和W三D늮单描qͼ非翻译:
在新数据写入Ӟq个操作首先提交到日志中作ؓredoU录Q最q的数据存储在内存的排序~存memtable中;旧的数据存储在一pd的SSTable 中。在recover中,tablet server从METADATA表中dmetadataQmetadata包含了组成Tablet的所有SSTableQ纪录了q些SSTable的元 数据信息Q如SSTable的位|、StartKey、EndKey{)以及一pd日志中的redo炏VTablet ServerdSSTable的烦引到内存Qƈreplayq些redo点之后的更新来重构memtable?br />在读Ӟ完成格式、授权等查后Q读会同时读取SSTable、memtableQHBase中还包含了BlockCache中的数据Qƈ合ƈ他们的结果,׃SSTable和memtable都是字典序排列,因而合q操作可以很高效完成?br />

SSTable在Compactionq程中的使用

在BigTable论文5.4 Compaction节中是q样说的Q?br />
Compaction

As write operations execute, the size of the memtable increases. When the memtable size reaches a threshold, the memtable is frozen, a new memtable is created, and the frozen memtable is converted to an SSTable and written to GFS. This minor compaction process has two goals: it shrinks the memory usage of the tablet server, and it reduces the amount of data that has to be read from the commit log during recovery if this server dies. Incoming read and write operations can continue while compactions occur.

Every minor compaction creates a new SSTable. If this behavior continued unchecked, read operations might need to merge updates from an arbitrary number of SSTables. Instead, we bound the number of such files by periodically executing a merging compaction in the background. A merging compaction reads the contents of a few SSTables and the memtable, and writes out a new SSTable. The input SSTables and memtable can be discarded as soon as the compaction has finished.

A merging compaction that rewrites all SSTables into exactly one SSTable is called a major compaction. SSTables produced by non-major compactions can contain special deletion entries that suppress deleted data in older SSTables that are still live. A major compaction, on the other hand, produces an SSTable that contains no deletion information or deleted data. Bigtable cycles through all of its tablets and regularly applies major compactions to them. These major compactions allow Bigtable to reclaim resources used by deleted data, and also allow it to ensure that deleted data disappears from the system in a timely fashion, which is important for services that store sensitive data.

随着memtable大小增加C个阀|q个memtable会被M而创Z个新的memtable以供使用Q而旧的memtable会{换成一个SSTable而写道GFS中,q个q程叫做minor compaction。这个minor compaction可以减少内存使用量,q可以减日志大,因ؓ持久化后的数据可以从日志中删除?/span>在minor compactionq程中,可以l箋处理dh?br />每次minor compaction会生成新的SSTable文gQ如果SSTable文g数量增加Q则会媄响读的性能Q因而每ơ读都需要读取所有SSTable文gQ然后合q结果,因而对SSTable文g个数需要有上限Qƈ且时不时的需要在后台做merging compactionQ这个merging compactiond一些SSTable文g和memtable的内容,q将他们合ƈ写入一个新的SSTable中。当q个q程完成后,q些源SSTable和memtable可以被删除了?br />如果一个merging compaction是合q所有SSTableC个SSTableQ则q个q程U做major compaction。一ơmajor compaction会将mark成删除的信息、数据删除,而其他两ơcompaction则会保留q些信息、数据(mark的Ş式)。Bigtable会时不时的扫描所有的TabletQƈ对它们做major compaction。这个major compaction可以需要删除的数据真正的删除从而节省空_q保持系l一致性?/span>

SSTable的locality和In Memory

在Bigtable中,它的本地性是由Locality group来定义的Q即多个column family可以l合C个locality group中,在同一个Tablet中,使用单独的SSTable存储q些在同一个locality group的column family。HBase把这个模型简化了Q即每个column family在每个HRegion都用单独的HFile存储QHFile没有locality group的概念,或者一个column family是一个locality group?/span>

在Bigtable中,q可以支持在locality groupU别讄是否所有这个locality group的数据加载到内存中,在HBase中通过column family定义时设|。这个内存加载采用g时加载,主要应用于一些小的column familyQƈ且经常被用到的,从而提升读的性能Q因而这样就不需要再从磁盘中d了?/span>

SSTable压羃

Bigtable的压~是Zlocality groupU别Q?br />
Compression

Clients can control whether or not the SSTables for a locality group are compressed, and if so, which compression format is used. The user-specified compression format is applied to each SSTable block (whose size is controllable via a locality group specific tuning parameter). Although we lose some space by compressing each block separately, we benefit in that small portions of an SSTable can be read without decompressing the entire file. Many clients use a two-pass custom compression scheme. The first pass uses Bentley and McIlroy’s scheme [6], which compresses long common strings across a large window. The second pass uses a fast compression algorithm that looks for repetitions in a small 16 KB window of the data. Both compression passes are very fast—they encode at 100–200 MB/s, and decode at 400–1000 MB/s on modern machines.

Bigtable的压~以SSTable中的一个Block为单位,虽然每个Block为压~单位损׃些空_但是采用q种方式Q我们可以以Block为单位读取、解压、分析,而不是每ơ以一?#8220;?#8221;的SSTable为单位读取、解压、分析?/span>

SSTable的读~存

Z提升ȝ性能QBigtable采用两层~存机制Q?br />
Caching for read performance

To improve read performance, tablet servers use two levels of caching. The Scan Cache is a higher-level cache that caches the key-value pairs returned by the SSTable interface to the tablet server code. The Block Cache is a lower-level cache that caches SSTables blocks that were read from GFS. The Scan Cache is most useful for applications that tend to read the same data repeatedly. The Block Cache is useful for applications that tend to read data that is close to the data they recently read (e.g., sequential reads, or random reads of different columns in the same locality group within a hot row).

两层~存分别是:
1. High LevelQ缓存从SSTabled的Key/Value寏V提升那些們֐重复的读取相同的数据的操作(引用局部性原理)?br />2. Low LevelQBlockCacheQ缓存SSTable中的Block。提升那些們֐于读取相q数据的操作?br />

Bloom Filter

前文有提到Bigtable采用合ƈ读,即需要读取每个SSTable中的相关数据Qƈ合ƈ成一个结果返回,然而每ơ读都需要读取所有SSTableQ自然会耗费性能Q因而引入了Bloom FilterQ它可以很快速的扑ֈ一个RowKey不在某个SSTable中的事实Q注Q反q来则不成立Q?br />
Bloom Filter

As described in Section 5.3, a read operation has to read from all SSTables that make up the state of a tablet. If these SSTables are not in memory, we may end up doing many disk accesses. We reduce the number of accesses by allowing clients to specify that Bloom fil- ters [7] should be created for SSTables in a particu- lar locality group. A Bloom filter allows us to ask whether an SSTable might contain any data for a spec- ified row/column pair. For certain applications, a small amount of tablet server memory used for storing Bloom filters drastically reduces the number of disk seeks re- quired for read operations. Our use of Bloom filters also implies that most lookups for non-existent rows or columns do not need to touch disk.

SSTable设计成Immutable的好?/h2>在SSTable定义中就有提到SSTable是一个Immutable的order mapQ这个Immutable的设计可以让pȝ单很多:
Exploiting Immutability

Besides the SSTable caches, various other parts of the Bigtable system have been simplified by the fact that all of the SSTables that we generate are immutable. For example, we do not need any synchronization of accesses to the file system when reading from SSTables. As a result, concurrency control over rows can be implemented very efficiently. The only mutable data structure that is accessed by both reads and writes is the memtable. To reduce contention during reads of the memtable, we make each memtable row copy-on-write and allow reads and writes to proceed in parallel.

Since SSTables are immutable, the problem of permanently removing deleted data is transformed to garbage collecting obsolete SSTables. Each tablet’s SSTables are registered in the METADATA table. The master removes obsolete SSTables as a mark-and-sweep garbage collection [25] over the set of SSTables, where the METADATA table contains the set of roots.

Finally, the immutability of SSTables enables us to split tablets quickly. Instead of generating a new set of SSTables for each child tablet, we let the child tablets share the SSTables of the parent tablet.

关于Immutable的优Ҏ以下几点Q?/span>
1. 在读SSTable是不需要同步。读写同步只需要在memtable中处理,Z减少memtable的读写竞争,Bigtablememtable的row设计成copy-on-writeQ从而读写可以同时进行?/span>
2. 怹的移除数据{变ؓSSTable的Garbage Collect。每个Tablet中的SSTable在METADATA表中有注册,master使用mark-and-sweep法SSTable在GCq程中移除?/span>
3. 可以让Tablet Splitq程变的高效Q我们不需要ؓ每个子Tablet创徏新的SSTableQ而是可以׃n?/span>Tablet的SSTable?/span>

DLevin 2015-09-25 01:35 发表评论
]]>[转]高性能IO模型析http://www.tkk7.com/DLevin/archive/2015/09/04/427118.htmlDLevinDLevinFri, 04 Sep 2015 07:16:00 GMThttp://www.tkk7.com/DLevin/archive/2015/09/04/427118.htmlhttp://www.tkk7.com/DLevin/comments/427118.htmlhttp://www.tkk7.com/DLevin/archive/2015/09/04/427118.html#Feedback0http://www.tkk7.com/DLevin/comments/commentRss/427118.htmlhttp://www.tkk7.com/DLevin/services/trackbacks/427118.html高性能IO模型析

转自Qhttp://www.cnblogs.com/fanzhidongyzby/p/4098546.html

服务器端~程l常需要构造高性能的IO模型Q常见的IO模型有四U:

Q?Q?/span>同步dIOQBlocking IOQ:即传l的IO模型?/span>

Q?Q?/span>同步非阻?/span>IOQNon-blocking IOQ:默认创徏的socket都是d的,非阻塞IO要求socket被设|ؓNONBLOCK。注意这里所说的NIOqJava的NIOQNew IOQ库?/span>

Q?Q?/span>IO多\复用QIO MultiplexingQ:即经典的Reactor设计模式Q有时也UCؓ异步dIOQJava中的Selector和Linux中的epoll都是q种模型?/span>

Q?Q?/span>异步IOQAsynchronous IOQ:即经典的Proactor设计模式Q也UCؓ异步非阻塞IO?/span>

同步和异?/span>的概忉|q的是用LE与内核的交互方式:同步是指用户U程发vIOh后需要等待或者轮询内核IO操作完成后才能l执行;而异步是指用LE发起IOh后仍l箋执行Q当内核IO操作完成后会通知用户U程Q或者调用用LE注册的回调函数?/span>

d和非d的概忉|q的是用LE调用内核IO操作的方式:d是指IO操作需要彻底完成后才返回到用户I间Q而非d是指IO操作被调用后立即q回l用户一个状态|无需{到IO操作d完成?/span>

另外Q?/span>Richard Stevens 在《Unix |络~程》卷1中提到的Z信号驱动的IOQSignal Driven IOQ模型,׃该模型ƈ不常用,本文不作涉及。接下来Q我们详l分析四U常见的IO模型的实现原理。ؓ了方便描qͼ我们l一使用IO的读操作作ؓCZ?/span>

一?/span>同步dIO

同步dIO模型是最单的IO模型Q用LE在内核q行IO操作时被d?/span>

?/span>1 同步dIO

如图1所C,用户U程通过pȝ调用read发vIOL作,qL间{到内核空间。内核等到数据包到达后,然后接收的数据拯到用L_完成read操作?/span>

用户U程使用同步dIO模型的伪代码描述为:

{
    read(socket, buffer);
    process(buffer);
}

即用户需要等待readsocket中的数据d到buffer后,才l处理接收的数据。整个IOh的过E中Q用LE是被阻塞的Q这D用户在发起IOhӞ不能做Q何事情,对CPU的资源利用率不够?/span>

二?/span>同步非阻塞IO

同步非阻塞IO是在同步dIO的基上,socket讄为NONBLOCK。这样做用户U程可以在发起IOh后可以立卌回?/span>

 

? 同步非阻塞IO

如图2所C,׃socket是非d的方式,因此用户U程发vIOh时立卌回。但q未dCQ何数据,用户U程需要不断地发vIOhQ直到数据到辑֐Q才真正d到数据,l箋执行?/span>

用户U程使用同步非阻塞IO模型的伪代码描述为:

{
    
while(read(socket, buffer) != SUCCESS) { }
    process(buffer);
}

? 用户需要不断地调用readQ尝试读取socket中的数据Q直到读取成功后Q才l箋处理接收的数据。整个IOh的过E中Q虽然用LE每ơ发起IO? 求后可以立即q回Q但是ؓ了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。一般很直接用这U模型,而是在其他IO模型中用非? 塞IOq一Ҏ?/span>

三?/span>IO多\复用

IO多\复用模型是徏立在内核提供的多路分d数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题?/span>

? 多\分离函数select

如图3所C,用户首先需要进行IO操作的socketd到select中,然后d{待selectpȝ调用q回。当数据到达Ӟsocket被激z,select函数q回。用LE正式发起readhQ读取数据ƈl箋执行?/span>

? 程上来看,使用select函数q行IOh和同步阻塞模型没有太大的区别Q甚臌多了d监视socketQ以及调用select函数的额外操作,? 率更差。但是,使用select以后最大的优势是用户可以在一个线E内同时处理多个socket的IOh。用户可以注册多个socketQ然后不断地? 用selectd被激zȝsocketQ即可达到在同一个线E内同时处理多个IOh的目?/span>。而在同步d模型中,必须通过多线E的方式才能辑ֈq个目的?/span>

用户U程使用select函数的伪代码描述为:

{
    select(socket);
    
while(1) {
        sockets 
= select();
        
for(socket in sockets) {
            
if(can_read(socket)) {
                read(socket, buffer);
                process(buffer);
            }
        }
    }
}

其中while循环前将socketd到select监视中,然后在while内一直调用select获取被激zȝsocketQ一旦socket可读Q便调用read函数socket中的数据d出来?/span>

? 而,使用select函数的优点ƈ不仅限于此。虽然上q方式允许单U程内处理多个IOhQ但是每个IOh的过E还是阻塞的Q在select函数上阻 塞)Q^均时间甚x同步dIO模型q要ѝ如果用LE只注册自己感兴的socket或者IOhQ然后去做自q事情Q等到数据到来时再进行处 理,则可以提高CPU的利用率?/span>

IO多\复用模型使用了Reactor设计模式实现了这一机制?/span>

? Reactor设计模式

? ?所C,EventHandler抽象c表CIO事g处理器,它拥有IO文g句柄HandleQ通过get_handle获取Q,以及对Handle? 操作handle_eventQ读/写等Q。承于EventHandler的子cd以对事g处理器的行ؓq行定制。Reactorcȝ于管? EventHandlerQ注册、删除等Q,q用handle_events实现事g循环Q不断调用同步事件多路分dQ一般是内核Q的多\分离函数 selectQ只要某个文件句柄被Ȁz(可读/写等Q,selectp回(dQ,handle_events׃调用与文件句柄关联的事g处理器的 handle_eventq行相关操作?/span>

?/span>5 IO多\复用

? ?所C,通过Reactor的方式,可以用LE轮询IO操作状态的工作l一交给handle_events事g循环q行处理。用LE注册事件处? 器之后可以l执行做其他的工作(异步Q,而ReactorU程负责调用内核的select函数查socket状态。当有socket被激zLQ则通知 相应的用LE(或执行用LE的回调函数Q,执行handle_eventq行数据d、处理的工作。由于select函数是阻塞的Q因此多路IO复用 模型也被UCؓ异步dIO模型。注意,q里的所说的d是指select函数执行时线E被dQ而不是指socket。一般在使用IO多\复用模型 Ӟsocket都是讄为NONBLOCK的,不过qƈ不会产生影响Q因为用户发起IOhӞ数据已经到达了,用户U程一定不会被d?/span>

用户U程使用IO多\复用模型的伪代码描述为:

void UserEventHandler::handle_event() {
    
if(can_read(socket)) {
        read(socket, buffer);
        process(buffer);
    }
}

{
    Reactor.register(
new UserEventHandler(socket));
}

用户需要重写EventHandler的handle_event函数q行d数据、处理数据的工作Q用LE只需要将自己的EventHandler注册到Reactor卛_。Reactor中handle_events事g循环的伪代码大致如下?/span>

Reactor::handle_events() {
    
while(1) {
       sockets 
= select();
       
for(socket in sockets) {
            get_event_handler(socket).handle_event();
       }
    }
}

事g循环不断地调用select获取被激zȝsocketQ然后根据获取socket对应的EventHandlerQ执行器handle_event函数卛_?/span>

IO多\复用是最怋用的IO模型Q但是其异步E度q不?#8220;d”Q因为它使用了会dU程的selectpȝ调用。因此IO多\复用只能UCؓ异步dIOQ而非真正的异步IO?/span>

四?/span>异步IO

“? ?#8221;的异步IO需要操作系l更强的支持。在IO多\复用模型中,事g循环文件句柄的状态事仉知l用LE,qLE自行读取数据、处理数据。而在? 步IO模型中,当用LE收到通知Ӟ数据已经被内核读取完毕,q放在了用户U程指定的缓冲区内,内核在IO完成后通知用户U程直接使用卛_?/span>

异步IO模型使用了Proactor设计模式实现了这一机制?/span>

? Proactor设计模式

? ?QProactor模式和Reactor模式在结构上比较怼Q不q在用户QClientQ用方式上差别较大。Reactor模式中,用户U程通过 向Reactor对象注册感兴的事g监听Q然后事件触发时调用事g处理函数。而Proactor模式中,用户U程? AsynchronousOperationQ读/写等Q、Proactor以及操作完成时的CompletionHandler注册? AsynchronousOperationProcessor。AsynchronousOperationProcessor使用Facade模式? 供了一l异步操作APIQ读/写等Q供用户使用Q当用户U程调用异步API后,便l执行自qd? AsynchronousOperationProcessor 会开启独立的内核U程执行异步操作Q实现真正的异步。当异步IO操作完成 ӞAsynchronousOperationProcessor用LE与AsynchronousOperation一h册的Proactor 和CompletionHandler取出Q然后将CompletionHandler与IO操作的结果数据一赯{发给 ProactorQProactor负责回调每一个异步操作的事g完成处理函数handle_event。虽然Proactor模式中每个异步操作都可以 l定一个Proactor对象Q但是一般在操作pȝ中,Proactor被实CؓSingleton模式Q以便于集中化分发操作完成事件?/span>

?/span>7 异步IO

? ?所C,异步IO模型中,用户U程直接使用内核提供的异步IO API发vreadhQ且发v后立卌回,l箋执行用户U程代码。不q此时用LE已 l将调用的AsynchronousOperation和CompletionHandler注册到内核,然后操作pȝ开启独立的内核U程d理IO? 作。当readh的数据到达时Q由内核负责dsocket中的数据Qƈ写入用户指定的缓冲区中。最后内核将read的数据和用户U程注册? CompletionHandler分发l内部ProactorQProactorIO完成的信息通知l用LE(一般通过调用用户U程注册的完成事? 处理函数Q,完成异步IO?/span>

用户U程使用异步IO模型的伪代码描述为:


void UserCompletionHandler::handle_event(buffer) {
    process(buffer);
}

{
    aio_read(socket, 
new UserCompletionHandler);
}

用户需要重写CompletionHandler的handle_event函数q行处理数据的工作,参数buffer表示Proactor已经准备好的数据Q用LE直接调用内核提供的异步IO APIQƈ重写的CompletionHandler注册卛_?/span>

? 比于IO多\复用模型Q异步IOq不十分常用Q不高性能q发服务E序使用IO多\复用模型+多线EQ务处理的架构基本可以满需求。况且目前操作系l对 异步IO的支持ƈ非特别完善,更多的是采用IO多\复用模型模拟异步IO的方式(IO事g触发时不直接通知用户U程Q而是数据读写完毕后攑ֈ用户指定? ~冲ZQ。Java7之后已经支持了异步IOQ感兴趣的读者可以尝试用?/span>

本文从基本概c工作流E和代码C? 例三个层ơ简要描qC常见的四U高性能IO模型的结构和原理Q理清了同步、异步、阻塞、非dq些Ҏh的概c通过寚w性能IO模型的理解,可以在服 务端E序的开发中选择更符合实际业务特点的IO模型Q提高服务质量。希望本文对你有所帮助?/span>


怼的:
http://www.cnblogs.com/nufangrensheng/p/3588690.html
http://www.ibm.com/developerworks/cn/linux/l-async/



DLevin 2015-09-04 15:16 发表评论
]]>Netty3架构解析http://www.tkk7.com/DLevin/archive/2015/09/04/427031.htmlDLevinDLevinFri, 04 Sep 2015 01:40:00 GMThttp://www.tkk7.com/DLevin/archive/2015/09/04/427031.htmlhttp://www.tkk7.com/DLevin/comments/427031.htmlhttp://www.tkk7.com/DLevin/archive/2015/09/04/427031.html#Feedback0http://www.tkk7.com/DLevin/comments/commentRss/427031.htmlhttp://www.tkk7.com/DLevin/services/trackbacks/427031.html前记

很早以前有读Netty源码的打了Q然而第一ơ尝试的时候从Netty4开始,一直抓不到核心的框架流E,后来因ؓ其他事情忙着放下了。这ơ趁着休假重新捡vq个骨_因ؓNetty3现在q在被很多项目用,因而这ơ决定先从Netty3入手Q瞬间发现Netty3的代码比Netty4中规中矩的多Q很多概念在代码本n中都有清晰的表达Q所以半天就把整个框架的骨架搞清楚了。再?a >Netty4对Netty3的改qȝQ回去读Netty4的源码,反而觉得轻松了Q一U豁然开朗的感觉?br />
记得d读Jetty源码的时候,因ؓ代码太庞大,q且自己的HTTP Server的了解太,因而只能自底向上的一个一个模块的叠加Q直到最后把所以的模块q接在一赯看清它的真正核心骨架。现在读源码Q开始习惯先把骨架理清,然后延C同的器官、血肉而看清整个h体?br />
本文从Reactor模式在Netty3中的应用Q引出Netty3的整体架构以及控制流E;然而除了Reactor模式QNetty3q在ChannelPipeline中用了Intercepting Filter模式Q这个模式也在Servlet的Filter中成功用,因而本文还会从Intercepting Filter模式出发详细介绍ChannelPipeline的设计理c本文假设读者已l对Netty有一定的了解Q因而不会包含过多入门介l,以及帮Netty做宣传的文字?br />

Netty3中的Reactor模式

Reactor模式在Netty中应用非常成功,因而它也是在Netty中受大肆宣传的模式,关于Reactor模式可以详细参考本人的另一文?a href="http://www.tkk7.com/DLevin/archive/2015/09/02/427045.html">《Reactor模式详解?/a>Q对Reactor模式的实现是Netty3的基本骨Ӟ因而本节会详l介lReactor模式如何应用Netty3中?br />
如果诅RReactor模式详解》,我们知道Reactor模式由Handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handler构成Q在Java的实现版本中QChannel对应HandleQSelector对应Synchronous Event DemultiplexerQƈ且Netty3q用了两层ReactorQMain Reactor用于处理Client的连接请求,Sub Reactor用于处理和Clientq接后的dhQ关于这个概念还可以参考Doug Lea的这PPTQ?a >Scalable IO In JavaQ。所以我们先要解决Netty3中用什么类实现所有的上述模块q把他们联系在一LQ以NIO实现方式ZQ?br />
模式是一U抽象,但是在实CQ经怼因ؓ语言Ҏ、框架和性能需要而做一些改变,因而Netty3对Reactor模式的实现有一套自q设计Q?br />1. ChannelEventQ?/strong>Reactor是基于事件编E的Q因而在Netty3中用ChannelEvent抽象的表达Netty3内部可以产生的各U事Ӟ所有这些事件对象在Channels帮助cM产生Qƈ且由它将事g推入到ChannelPipeline中,ChannelPipeline构徏ChannelHandler道QChannelEvent经q个道实现所有的业务逻辑处理。ChannelEvent对应的事件有QChannelStateEvent表示Channel状态的变化事gQ而如果当前Channel存在Parent ChannelQ则该事件还会传递到Parent Channel的ChannelPipeline中,如OPEN、BOUND、CONNECTED、INTEREST_OPS{,该事件可以在各种不同实现的Channel、ChannelSink中生;MessageEvent表示从Socket中读取数据完成、需要向Socket写数据或ChannelHandler对当前Message解析(如Decoder、Encoder)后触发的事gQ它由NioWorker、需要对Message做进一步处理的ChannelHandler产生QWriteCompletionEvent表示写完成而触发的事gQ它由NioWorker产生QExceptionEvent表示在处理过E中出现的ExceptionQ它可以发生在各个构件中Q如Channel、ChannelSink、NioWorker、ChannelHandler中;IdleStateEvent由IdleStateHandler触发Q这也是一个ChannelEvent可以无缝扩展的例子。注Q在Netty4后,已经没有ChannelEventc,所有不同事仉用对应方法表达,q也意味qChannelEvent不可扩展QNetty4采用在ChannelInboundHandler中加入userEventTriggered()Ҏ来实现这U扩展,具体可以参?a >q里?br />2. ChannelHandlerQ?/strong>在Netty3中,ChannelHandler用于表示Reactor模式中的EventHandler。ChannelHandler只是一个标记接口,它有两个子接口:ChannelDownstreamHandler和ChannelUpstreamHandlerQ其中ChannelDownstreamHandler表示从用户应用程序流向Netty3内部直到向Socket写数据的道Q在Netty4中改名ؓChannelOutboundHandlerQChannelUpstreamHandler表示数据从Socketq入Netty3内部向用户应用程序做数据处理的管道,在Netty4中改名ؓChannelInboundHandler?br />3. ChannelPipelineQ?/strong>用于理ChannelHandler的管道,每个Channel一个ChannelPipeline实例Q可以运行过E中动态的向这个管道中d、删除ChannelHandlerQ由于实现的限制Q在最末端的ChannelHandler向后d或删除ChannelHandler不一定在当前执行程中v效,参?a >q里Q。ChannelPipeline内部l护一个ChannelHandler的双向链表,它以Upstream(Inbound)方向为正向,Downstream(Outbound)方向为方向。ChannelPipeline采用Intercepting Filter模式实现Q具体可以参?a href="http://www.tkk7.com/DLevin/archive/2015/09/03/427086.html">q里Q这个模式的实现在后一节中q是详细介绍?br />4. NioSelectorQ?/strong>Netty3使用NioSelector来存放SelectorQSynchronous Event DemultiplexerQ,每个C生的NIO Channel都向q个Selector注册自己以让q个Selector监听q个NIO Channel中发生的事gQ当事g发生Ӟ调用帮助cChannels中的Ҏ生成ChannelEvent实例Q将该事件发送到q个Netty Channel对应的ChannelPipeline中,而交l各UChannelHandler处理。其中在向Selector注册NIO ChannelӞNetty Channel实例以Attachment的Ş式传入,该Netty Channel在其内部的NIO Channel事g发生Ӟ会以Attachment的Ş式存在于SelectionKey中,因而每个事件可以直接从q个Attachment中获取相关链的Netty ChannelQƈ从Netty Channel中获取与之相兌的ChannelPipelineQ这个实现和Doug Lea?a >Scalable IO In Java一模一栗另外Netty3q采用了Scalable IO In Java中相同的Main Reactor和Sub Reactor设计Q其中NioSelector的两个实玎ͼBoss即ؓMain ReactorQNioWorker为Sub Reactor。Boss用来处理新连接加入的事gQNioWorker用来处理各个q接对Socket的读写事Ӟ其中Boss通过NioWorkerPool获取NioWorker实例QNetty3模式使用RoundRobin方式攑֛NioWorker实例。更形象一点的Q可以通过Scalable IO In Java的这张图表达Q?br />
若与Ractor模式对应QNioSelector中包含了Synchronous Event DemultiplexerQ而ChannelPipeline中管理着所有EventHandlerQ因而NioSelector和ChannelPipeline共同构成了Initiation Dispatcher?br />5. ChannelSinkQ?/strong>在ChannelHandler处理完成所有逻辑需要向客户端写响应数据Ӟ一般会调用Netty Channel中的writeҎQ然而在q个writeҎ实现中,它不是直接向其内部的Socket写数据,而是交给Channels帮助c,内部创徏DownstreamMessageEventQ反向从ChannelPipeline的管道中过去,直到W一个ChannelHandler处理完毕Q最后交lChannelSink处理Q以避免d写而媄响程序的吞吐量。ChannelSink这个MessageEvent提交lNetty Channel中的writeBufferQueueQ最后NioWorker会等到这个NIO Channel已经可以处理写事件时无阻塞的向这个NIO Channel写数据。这是上图的send是从SubReactor直接出发的原因?br />6. ChannelQ?/strong>Netty有自qChannel抽象Q它是一个资源的容器Q包含了所有一个连接涉及到的所有资源的饮用Q如装NIO Channel、ChannelPipeline、Boss、NioWorkerPool{。另外它q提供了向内部NIO Channel写响应数据的接口write、连?l定到某个地址的connect/bind接口{,个h感觉虽然对Channel本n来说Q因为它装了NIO ChannelQ因而这些接口定义在q里是合理的Q但是如果考虑到Netty的架构,它的Channel只是一个资源容器,有这个Channel实例可以得到和它相关的基本所有资源,因而这Uwrite、connect、bind动作不应该再由它负责Q而是应该由其他类来负责,比如在Netty4中就在ChannelHandlerContextd了writeҎQ虽然netty4q没有删除Channel中的write接口?br />

Netty3中的Intercepting Filter模式

如果说Reactor模式是Netty3的骨Ӟ那么Intercepting Filter模式则是Netty的中枢。Reactor模式主要应用在Netty3的内部实玎ͼ它是Netty3h良好性能的基Q而Intercepting Filter模式则是ChannelHandlerl合实现一个应用程序逻辑的基Q只有很好的理解了这个模式才能用好NettyQ甚臌得心应手?br />
关于Intercepting Filter模式的详l介l可以参?a href="http://www.tkk7.com/DLevin/archive/2015/09/03/427086.html">q里Q本节主要介lNetty3中对Intercepting Filter模式的实玎ͼ其实是DefaultChannelPipeline对Intercepting Filter模式的实现。在上文有提到Netty3的ChannelPipeline是ChannelHandler的容器,用于存储与管理ChannelHandlerQ同时它在Netty3中也起到桥梁的作用,卛_是连接Netty3内部到所有ChannelHandler的桥梁。作为ChannelPipeline的实现者DefaultChannelPipelineQ它使用一个ChannelHandler的双向链表来存储Q以DefaultChannelPipelineContext作ؓ节点Q?br />
public interface ChannelHandlerContext {
    Channel getChannel();

    ChannelPipeline getPipeline();

    String getName();

    ChannelHandler getHandler();

    
boolean canHandleUpstream();
    
boolean canHandleDownstream();
    
void sendUpstream(ChannelEvent e);
    
void sendDownstream(ChannelEvent e);
    Object getAttachment();

    
void setAttachment(Object attachment);
}

private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
   
volatile DefaultChannelHandlerContext next;
   
volatile DefaultChannelHandlerContext prev;
   
private final String name;
   
private final ChannelHandler handler;
   
private final boolean canHandleUpstream;
   
private final boolean canHandleDownstream;
   
private volatile Object attachment;
.....
}
在DefaultChannelPipeline中,它存储了和当前ChannelPipeline相关联的Channel、ChannelSink以及ChannelHandler链表的head、tailQ所有ChannelEvent通过sendUpstream、sendDownstream为入口流l整个链表:
public class DefaultChannelPipeline implements ChannelPipeline {
    
private volatile Channel channel;
    
private volatile ChannelSink sink;
    
private volatile DefaultChannelHandlerContext head;
    
private volatile DefaultChannelHandlerContext tail;
......
    
public void sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext head 
= getActualUpstreamContext(this.head);
        
if (head == null) {
            
return;
        }
        sendUpstream(head, e);
    }

    
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        
try {
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } 
catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    }

    
public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail 
= getActualDownstreamContext(this.tail);
        
if (tail == null) {
            
try {
                getSink().eventSunk(
this, e);
                
return;
            } 
catch (Throwable t) {
                notifyHandlerException(e, t);
                
return;
            }
        }
        sendDownstream(tail, e);
    }

    
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        
if (e instanceof UpstreamMessageEvent) {
            
throw new IllegalArgumentException("cannot send an upstream event to downstream");
        }
        
try {
            ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
        } 
catch (Throwable t) {
            e.getFuture().setFailure(t);
            notifyHandlerException(e, t);
        }
    }
对Upstream事gQ向后找到所有实CChannelUpstreamHandler接口的ChannelHandlerl成链(getActualUpstreamContext()Q?/span>Q而对Downstream事gQ向前找到所有实CChannelDownstreamHandler接口的ChannelHandlerl成链(getActualDownstreamContext()Q:
    private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
        
if (ctx == null) {
            
return null;
        }
        DefaultChannelHandlerContext realCtx 
= ctx;
        
while (!realCtx.canHandleUpstream()) {
            realCtx 
= realCtx.next;
            
if (realCtx == null) {
                
return null;
            }
        }
        
return realCtx;
    }
    
private DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
        
if (ctx == null) {
            
return null;
        }
        DefaultChannelHandlerContext realCtx 
= ctx;
        
while (!realCtx.canHandleDownstream()) {
            realCtx 
= realCtx.prev;
            
if (realCtx == null) {
                
return null;
            }
        }
        
return realCtx;
    }
在实际实现ChannelUpstreamHandler或ChannelDownstreamHandlerӞ调用 ChannelHandlerContext中的sendUpstream或sendDownstreamҎ控制流E交l下一? ChannelUpstreamHandler或下一个ChannelDownstreamHandlerQ或调用Channel中的writeҎ发? 响应消息?br />
public class MyChannelUpstreamHandler implements ChannelUpstreamHandler {
    
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        
// handle current logic, use Channel to write response if needed.
        
// ctx.getChannel().write(message);
        ctx.sendUpstream(e);
    }
}

public class MyChannelDownstreamHandler implements ChannelDownstreamHandler {
    
public void handleDownstream(
            ChannelHandlerContext ctx, ChannelEvent e) 
throws Exception {
        
// handle current logic
        ctx.sendDownstream(e);
    }
}
当ChannelHandler向ChannelPipelineContext发送事件时Q其内部从当前ChannelPipelineContext节点出发扑ֈ下一个ChannelUpstreamHandler或ChannelDownstreamHandler实例Qƈ向其发送ChannelEventQ对于Downstream链,如果到达铑ְQ则ChannelEvent发送给ChannelSinkQ?br />
public void sendDownstream(ChannelEvent e) {
    DefaultChannelHandlerContext prev 
= getActualDownstreamContext(this.prev);
   
if (prev == null) {
       
try {
            getSink().eventSunk(DefaultChannelPipeline.
this, e);
        } 
catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    } 
else {
        DefaultChannelPipeline.
this.sendDownstream(prev, e);
    }
}

public void sendUpstream(ChannelEvent e) {
    DefaultChannelHandlerContext next 
= getActualUpstreamContext(this.next);
   
if (next != null) {
        DefaultChannelPipeline.
this.sendUpstream(next, e);
    }
}
正是因ؓq个实现Q如果在一个末ChannelUpstreamHandler中先U除自己Q在向末添加一个新的ChannelUpstreamHandlerQ它是无效的Q因为它的next已经在调用前固定设|ؓnull了?br />
ChannelPipeline作ؓChannelHandler的容器,它还提供了各U增、删、改ChannelHandler链表中的ҎQ而且如果某个ChannelHandlerq实CLifeCycleAwareChannelHandlerQ则该ChannelHandler在被dqChannelPipeline或从中删除时都会得到同志Q?br />
public interface LifeCycleAwareChannelHandler extends ChannelHandler {
    
void beforeAdd(ChannelHandlerContext ctx) throws Exception;
    
void afterAdd(ChannelHandlerContext ctx) throws Exception;
    
void beforeRemove(ChannelHandlerContext ctx) throws Exception;
    
void afterRemove(ChannelHandlerContext ctx) throws Exception;
}

public interface ChannelPipeline {
    
void addFirst(String name, ChannelHandler handler);
    
void addLast(String name, ChannelHandler handler);
    
void addBefore(String baseName, String name, ChannelHandler handler);
    
void addAfter(String baseName, String name, ChannelHandler handler);
    
void remove(ChannelHandler handler);
    ChannelHandler remove(String name);

    
<extends ChannelHandler> T remove(Class<T> handlerType);
    ChannelHandler removeFirst();

    ChannelHandler removeLast();

    
void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

    
<extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
    ChannelHandler getFirst();

    ChannelHandler getLast();

    ChannelHandler get(String name);

    
<extends ChannelHandler> T get(Class<T> handlerType);
    ChannelHandlerContext getContext(ChannelHandler handler);

    ChannelHandlerContext getContext(String name);

    ChannelHandlerContext getContext(Class
<? extends ChannelHandler> handlerType);
    
void sendUpstream(ChannelEvent e);
    
void sendDownstream(ChannelEvent e);
    ChannelFuture execute(Runnable task);

    Channel getChannel();

    ChannelSink getSink();

    
void attach(Channel channel, ChannelSink sink);
    
boolean isAttached();
    List
<String> getNames();
    Map
<String, ChannelHandler> toMap();
}

在DefaultChannelPipeline的ChannelHandler链条的处理流EؓQ?br />

参考:

《Netty主页?/a>
《Netty源码解读Q四QNetty与Reactor模式?/a>
《Netty代码分析?/a>
Scalable IO In Java
Intercepting Filter Pattern


DLevin 2015-09-04 09:40 发表评论
]]>
Intercepting Filter模式详解http://www.tkk7.com/DLevin/archive/2015/09/03/427086.htmlDLevinDLevinThu, 03 Sep 2015 14:14:00 GMThttp://www.tkk7.com/DLevin/archive/2015/09/03/427086.htmlhttp://www.tkk7.com/DLevin/comments/427086.htmlhttp://www.tkk7.com/DLevin/archive/2015/09/03/427086.html#Feedback0http://www.tkk7.com/DLevin/comments/commentRss/427086.htmlhttp://www.tkk7.com/DLevin/services/trackbacks/427086.html问题描述在服务器~程中,通常需要处理多U不同的hQ在正式处理h之前Q需要对h做一些预处理Q如Q?br />
  1. U录每个Client的每ơ访问信息?/li>
  2. 对Clientq行认证和授权检查(Authentication and AuthorizationQ?/li>
  3. 查当前Session是否合法?/li>
  4. 查Client的IP地址是否可信赖或不可信赖QIP地址白名单、黑名单Q?/li>
  5. h数据是否先要解压或解码?/li>
  6. 是否支持Clienth的类型、Browser版本{?/li>
  7. d性能监控信息?/li>
  8. d调试信息?/li>
  9. 保证所有异帔R被正捕获到Q对未预料到的异常做通用处理Q防止给Client看到内部堆栈信息?br />

在响应返回给客户端之前,有时候也需要做一些预处理再返回:

  1. 对响应消息编码或压羃?/li>
  2. 为所有响应添加公共头、尾{消息?/li>
  3. q一步Enrich响应消息Q如d公共字段、Session信息、Cookie信息Q甚臛_全改变响应消息等?/li>
如何实现q样的需求,同时保持可扩展性、可重用性、可配置、移植性?

问题解决

要实现这U需求,最直观的方法就是在每个h处理q程中添加所有这些逻辑Qؓ了减代码重复,可以所有这些检查提取成ҎQ这样在每个处理Ҏ中调用即可:
public Response service1(Request request) {
    validate(request);
    request 
= transform(request);
    Response response 
= process1(request);
    
return transform(response);
}
此时Q如果出现service2ҎQ依焉要拷贝service1中的实现Q然后将process1换成process2卛_。这个时候我们发现很多重复代码,l箋对它重构Q比如提取公共逻辑到基cL模版ҎQ这U用承的方式会引起子cd父类的耦合Q如果要让某些模块变的可配置需要有太多的判断逻辑Q代码变的臃肿;因而可以更q一步,所有处理逻辑抽象Z个Processor接口Q然后用Decorate模式Q即引用优于l承Q:
public interface Processor {
    Response process(Request request);
}
public class CoreProcessor implements Processor {
    
public Response process(Request request) {
        
// do process/calculation
    }
}
public class DecoratedProcessor implements Processor {
    
private final Processor innerProcessor;
    
public DecoratedProcessor(Processor processor) {
        
this.innerProcessor = processor;
    }

    
public Response process(Request request) {
        request 
= preProcess(request);
        Response response 
= innerProcessor.process(request);
        response 
= postProcess(response);
        
return response;
    }

    
protected Request preProcess(Request request) {
        
return request;
    }
    
protected Response postProcess(Response response) {
        
return response;
    }
}

public void Transformer extends DecoratedProcessor {
    
public Transformer(Processor processor) {
        
super(processor);
    }

    
protected Request preProcess(Request request) {
        
return transformRequest(request);
    }
    
protected Response postProcess(Response response) {
        
return transformResponse(response);
    }
}
此时Q如果需要在真正的处理逻辑之前加入其他的预处理逻辑Q只需要承DecoratedProcessorQ实现preProcess或postProcessҎQ分别在h处理之前和请求处理之后横向切入一些逻辑Q也是所谓的AOP~程Q面向切面的~程Q然后只需要根据需求构个链条:
Processor processor = new MissingExceptionCatcher(new Debugger(new Transformer(new CoreProcessor());
Response response 
= processor.process(request);
......
q已l是相对比较好的设计了,每个Processor只需要关注自q实现逻辑卛_Q代码变的简z;q且每个Processor各自独立Q可重用性好Q测试方便;整条链上能实现的功能只是取决于链的构造,因而只需要有一U方法配|链的构造即可,可配|性也变得灉|Q然而很多时候引用是一U静态的依赖Q而无法满_态的需求。要构造这条链Q每个前|Processor需要知道其后的ProcessorQ这在某些情况下q不是在起初q道的。此Ӟ我们需要引入Intercepting Filter模式来实现动态的改变条链?br />

Intercepting Filter模式

在前文已l构Z一条由引用而成的Processor链,然而这是一条静态链Qƈ且需要一开始就能构造出q条链,Z解决q个限制Q我们可以引入一个ProcessorChain来维护这条链Qƈ且这条链可以动态的构徏?br />
有多U方式可以实现ƈ控制q个链:
  1. 在存储上Q可以用数l来存储所有的ProcessorQProcessor在数l中的位|表C个Processor在链条中的位|;也可以用链表来存储所有的ProcessorQ此时Processor在这个链表中的位|即是在链中的位|?/li>
  2. 在抽象上Q可以所有的逻辑都封装在Processor中,也可以将核心逻辑使用Processor抽象Q而外围逻辑使用Filter抽象?/li>
  3. 在流E控制上Q一般通过在Processor实现Ҏ中直接用ProcessorChain实例(通过参数掺入)来控制流E,利用Ҏ调用的进栈出栈的Ҏ实现preProcess()和postProcess()处理?/li>
在实际中使用q个模式的有QServlet的Filter机制、Netty的ChannelPipeline中、Structs2中的Interceptor中都实现了这个模式?br />

Intercepting Filter模式在Servlet的Filter中的实现QJetty版本Q?/h2>其中Servlet的Filter在Jetty的实C使用数组存储FilterQFilter末尾可以使用Servlet实例处理真正的业务逻辑Q在程控制上,使用FilterChain的doFilterҎ来实现。如FilterChain在Jetty中的实现Q?br />
public void doFilter(ServletRequest request, ServletResponse response) throws IOException, ServletException
   
// pass to next filter
    if (_filter < LazyList.size(_chain)) {
        FilterHolder holder
= (FilterHolder)LazyList.get(_chain, _filter++);
        Filter filter= holder.getFilter();
        filter.doFilter(request, response, this);                   
       
return;
    }

   
// Call servlet
    HttpServletRequest srequest = (HttpServletRequest)request;
   
if (_servletHolder != null) {
        _servletHolder.handle(_baseRequest,request, response);

    }
}
q里Q_chain实际上是一个Filter的ArrayListQ由FilterChain调用doFilter()启动调用W一个Filter的doFilter()ҎQ在实际的Filter实现中,需要手动的调用FilterChain.doFilter()Ҏ来启动下一个Filter的调用,利用Ҏ调用的进栈出栈的Ҏ实现Request的pre-process和Response的post-process处理。如果不调用FilterChain.doFilter()ҎQ则表示不需要调用之后的FilterQ流E从当前Filterq回Q在它之前的Filter的FilterChain.doFilter()调用之后的逻辑反向处理直到W一个Filter处理完成而返回?br />
public class MyFilter implements Filter {
    
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        
// pre-process ServletRequest
        chain.doFilter(request, response);
        
// post-process Servlet Response
    }
}
整个Filter铄处理程如下Q?br />

Intercepting Filter模式在Netty3中的实现

Netty3在DefaultChannelPipeline中实CIntercepting Filter模式Q其中ChannelHandler是它的Filter。在Netty3的DefaultChannelPipeline中,使用一个以ChannelHandlerContext点的双向链表来存储ChannelHandlerQ所有的横切面逻辑和实际业务逻辑都用ChannelHandler表达Q在控制程上用ChannelHandlerContext的sendDownstream()和sendUpstream()Ҏ来控制流E。不同于Servlet的FilterQChannelHandler有两个子接口QChannelUpstreamHandler和ChannelDownstreamHandler分别用来hq入时的处理程和响应出L的处理流E。对于Client的请求,从DefaultChannelPipeline的sendUpstream()Ҏ入口Q?br />
public void sendDownstream(ChannelEvent e) {
    DefaultChannelHandlerContext tail 
= getActualDownstreamContext(this.tail);
   
if (tail == null) {
       
try {
            getSink().eventSunk(
this, e);
           
return;
        } 
catch (Throwable t) {
            notifyHandlerException(e, t);
           
return;
        }
    }
    sendDownstream(tail, e);
}
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
   
if (e instanceof UpstreamMessageEvent) {
       
throw new IllegalArgumentException("cannot send an upstream event to downstream");
    }
   
try {
        ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e)
     } 
catch (Throwable t) {
        e.getFuture().setFailure(t);
        notifyHandlerException(e, t);
    }
}
如果有响应消息,该消息从DefaultChannelPipeline的sendDownstream()Ҏ为入口:
public void sendUpstream(ChannelEvent e) {
    DefaultChannelHandlerContext head 
= getActualUpstreamContext(this.head);
   
if (head == null) {
        return;
    }
    sendUpstream(head, e);
}
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
   
try {
        ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
    } 
catch (Throwable t) {
        notifyHandlerException(e, t);
    }
}
在实际实现ChannelUpstreamHandler或ChannelDownstreamHandlerӞ调用ChannelHandlerContext中的sendUpstream或sendDownstreamҎ控制流E交l下一个ChannelUpstreamHandler或下一个ChannelDownstreamHandlerQ或调用Channel中的writeҎ发送响应消息?br />
public class MyChannelUpstreamHandler implements ChannelUpstreamHandler {
    
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        
// handle current logic, use Channel to write response if needed.
        
// ctx.getChannel().write(message);
        ctx.sendUpstream(e);
    }
}

public class MyChannelDownstreamHandler implements ChannelDownstreamHandler {
    
public void handleDownstream(
            ChannelHandlerContext ctx, ChannelEvent e) 
throws Exception {
        
// handle current logic
        ctx.sendDownstream(e);
    }
}
当ChannelHandler向ChannelPipelineContext发送事件时Q其内部从当前ChannelPipelineContext 节点出发扑ֈ下一个ChannelUpstreamHandler或ChannelDownstreamHandler实例Qƈ向其发? ChannelEventQ对于Downstream链,如果到达铑ְQ则ChannelEvent发送给ChannelSinkQ?br />
public void sendDownstream(ChannelEvent e) {
    DefaultChannelHandlerContext prev 
= getActualDownstreamContext(this.prev);
   
if (prev == null) {
       
try {
            getSink().eventSunk(DefaultChannelPipeline.
this, e);
        } 
catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    } 
else {
        DefaultChannelPipeline.
this.sendDownstream(prev, e);
    }
}

public void sendUpstream(ChannelEvent e) {
    DefaultChannelHandlerContext next 
= getActualUpstreamContext(this.next);
   
if (next != null) {
        DefaultChannelPipeline.
this.sendUpstream(next, e);
    }
}
正是因ؓq个实现Q如果在一个末ChannelUpstreamHandler中先U除自己Q在向末添加一个新的ChannelUpstreamHandlerQ它是无效的Q因为它的next已经在调用前固定设|ؓnull了?br />
在DefaultChannelPipeline的ChannelHandler链条的处理流EؓQ?br />
在这个实CQ不像Servlet的Filter实现利用Ҏ调用栈的q出栈来完成pre-process和post-processQ而是在进ȝ铑֒出来的链各自调用handleUpstream()和handleDownstream()ҎQ这样会引v调用栈其实是两条铄dQ因而需要注意这条链的总长度。这样做的好处是q条ChannelHandler的链不依赖于Ҏ调用栈,而是在DefaultChannelPipeline内部本n的链Q因而在handleUpstream()或handleDownstream()可以随时执行流E{发给其他U程或线E池Q只需要保留ChannelPipelineContext引用Q在处理完成后用q个ChannelPipelineContext重新向这条链的后一个节点发送ChannelEventQ然而由于Servlet的Filter依赖于方法的调用栈,因而方法返回意味着所有执行完成,q种限制在异步编E中会引起问题,因而Servlet?.0后引入了Async的支持?br />

Intercepting Filter模式的缺?/h2>单提一下这个模式的~点Q?br />1. 相对传统的编E模型,q个模式有一定的学习曲线Q需要很好的理解该模式后才能灉|的应用它来编E?br />2. 需要划分不同的逻辑C同的Filter中,q有些时候ƈ不是那么Ҏ?br />3. 各个Filter之间׃n数据变得困难。在Netty3中可以自定义自己的ChannelEvent来实现自定义消息的传输,或者用ChannelPipelineContext的Attachment字段来实现消息传输,而Servlet中的Filter则没有提供类似的机制Q如果不是可以配|的数据在Config中传递,其他时候的数据׃n需要其他机刉合完成?br />

参?/h2>Core J2EE Pattern - Intercepting Filter

DLevin 2015-09-03 22:14 发表评论
]]>Reactor模式详解http://www.tkk7.com/DLevin/archive/2015/09/02/427045.htmlDLevinDLevinWed, 02 Sep 2015 07:14:00 GMThttp://www.tkk7.com/DLevin/archive/2015/09/02/427045.htmlhttp://www.tkk7.com/DLevin/comments/427045.htmlhttp://www.tkk7.com/DLevin/archive/2015/09/02/427045.html#Feedback5http://www.tkk7.com/DLevin/comments/commentRss/427045.htmlhttp://www.tkk7.com/DLevin/services/trackbacks/427045.html 前记

W一ơ听到Reactor模式是三q前的某个晚上,一个室友突然跑q来问我什么是Reactor模式Q我上网查了一下,很多人都是给出NIO中的 Selector的例子,而且是NIO里Selector多\复用模型Q只是给它v了一个比较fancy的名字而已Q虽然它引入了EventLoop? 念,q对我来说是新的概念Q但是代码实现却是一LQ因而我q没有很在意q个模式。然而最q开始读Netty源码Q而Reactor模式是很多介lNetty的文章中被大肆宣传的模式Q因而我再次问自己,什么是Reactor模式Q本文就是对q个问题关于我的一些理解和试着来解{?br />

什么是Reactor模式

要回{这个问题,首先当然是求助Google或WikipediaQ其中Wikipedia上说Q?#8220;The reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to associated request handlers.”。从q个描述中,我们知道Reactor模式首先?strong>事g驱动的,有一个或多个q发输入源,有一个Service HandlerQ有多个Request Handlers
Q这个Service Handler会同步的输入的hQEventQ多路复用的分发l相应的Request Handler。如果用图来表达Q?br />
从结构上Q这有点cM生者消费者模式,x一个或多个生者将事g攑օ一个Queue中,而一个或多个消费者主动的从这个Queue中Poll事g来处理;而Reactor模式则ƈ没有Queue来做~冲Q每当一个Event输入到Service Handler之后Q该Service Handler会主动的Ҏ不同的Eventcd其分发l对应的Request Handler来处理?br />
更学术的Q这文章(Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous EventsQ上_“The Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients. Each service in an application may consistent of several methods and is represented by a separate event handler that is responsible for dispatching service-specific requests. Dispatching of event handlers is performed by an initiation dispatcher, which manages the registered event handlers. Demultiplexing of service requests is performed by a synchronous event demultiplexer. Also known as Dispatcher, Notifier”。这D|q和Wikipedia上的描述cMQ有多个输入源,有多个不同的EventHandlerQRequestHandlerQ来处理不同的请求,Initiation Dispatcher用于理EventHanderQEventHandler首先要注册到Initiation Dispatcher中,然后Initiation DispatcherҎ输入的Event分发l注册的EventHandlerQ然而Initiation Dispatcherq不监听Event的到来,q个工作交给Synchronous Event Demultiplexer来处理?br />

Reactor模式l构

在解决了什么是Reactor模式后,我们来看看Reactor模式是由什么模块构成。图是一U比较简zŞ象的表现方式Q因而先上一张图来表辑֐个模块的名称和他们之间的关系Q?br />
HandleQ?/strong>x作系l中的句柄,是对资源在操作系l层面上的一U抽象,它可以是打开的文件、一个连?Socket)、Timer{。由于Reactor模式一般用在|络~程中,因而这里一般指Socket HandleQ即一个网l连接(ConnectionQ在Java NIO中的ChannelQ。这个Channel注册到Synchronous Event Demultiplexer中,以监听Handle中发生的事gQ对ServerSocketChannnel可以是CONNECT事gQ对SocketChannel可以是READ、WRITE、CLOSE事g{?br />Synchronous Event DemultiplexerQ?/strong>d{待一pd的Handle中的事g到来Q如果阻塞等待返回,卌C在q回的Handle中可以不d的执行返回的事gcd。这个模块一般用操作系l的select来实现。在Java NIO中用Selector来封装,当Selector.select()q回Ӟ可以调用Selector的selectedKeys()Ҏ获取Set<SelectionKey>Q一个SelectionKey表达一个有事g发生的Channel以及该Channel上的事gcd。上囄“Synchronous Event Demultiplexer ---notifies--> Handle”的流E如果是对的Q那内部实现应该是select()Ҏ在事件到来后会先讄Handle的状态,然后q回。不了解内部实现机制Q因而保留原图?br />Initiation DispatcherQ?/strong>用于理Event HandlerQ即EventHandler的容器,用以注册、移除EventHandler{;另外Q它q作为Reactor模式的入口调用Synchronous Event Demultiplexer的selectҎ以阻塞等待事件返回,当阻塞等待返回时Q根据事件发生的Handle其分发l对应的Event Handler处理Q即回调EventHandler中的handle_event()Ҏ?br />Event HandlerQ?/strong>定义事g处理ҎQhandle_event()Q以供InitiationDispatcher回调使用?br />Concrete Event HandlerQ?/strong>事gEventHandler接口Q实现特定事件处理逻辑?br />

Reactor模式模块之间的交?/h2> 单描qC下Reactor各个模块之间的交互流E,先从序列囑ּ始:

1. 初始化InitiationDispatcherQƈ初始化一个Handle到EventHandler的Map?br />2. 注册EventHandler到InitiationDispatcher中,每个EventHandler包含对相应Handle的引用,从而徏立Handle到EventHandler的映(MapQ?br />3. 调用InitiationDispatcher的handle_events()Ҏ以启动Event Loop。在Event Loop中,调用select()ҎQSynchronous Event DemultiplexerQ阻塞等待Event发生?br />4. 当某个或某些Handle的Event发生后,select()Ҏq回QInitiationDispatcherҎq回的Handle扑ֈ注册的EventHandlerQƈ回调该EventHandler的handle_events()Ҏ?br />5. 在EventHandler的handle_events()Ҏ中还可以向InitiationDispatcher中注册新的EventhandlerQ比如对AcceptorEventHandler来,当有新的clientq接Ӟ它会产生新的EventHandler以处理新的连接,q注册到InitiationDispatcher中?br />

Reactor模式实现

?a >Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events中,一直以Logging Server来分析Reactor模式Q这个Logging Server的实现完全遵循这里对Reactor描述Q因而放在这里以做参考。Logging Server中的Reactor模式实现分两个部分:Clientq接到Logging Server和Client向Logging Server写Log。因而对它的描述分成q两个步骤?br />Clientq接到Logging Server

1. Logging Server注册LoggingAcceptor到InitiationDispatcher?br />2. Logging Server调用InitiationDispatcher的handle_events()Ҏ启动?br />3. InitiationDispatcher内部调用select()ҎQSynchronous Event DemultiplexerQ,d{待Clientq接?br />4. Clientq接到Logging Server?br />5. InitiationDisptcher中的select()Ҏq回Qƈ通知LoggingAcceptor有新的连接到来?
6. LoggingAcceptor调用acceptҎacceptq个新连接?br />7. LoggingAcceptor创徏新的LoggingHandler?br />8. 新的LoggingHandler注册到InitiationDispatcher?同时也注册到Synchonous Event Demultiplexer?Q等待Client发v写logh?br />Client向Logging Server写Log

1. Client发送log到Logging server?br />2. InitiationDispatcher监测到相应的Handle中有事g发生Q返回阻塞等待,Ҏq回的Handle扑ֈLoggingHandlerQƈ回调LoggingHandler中的handle_event()Ҏ?br />3. LoggingHandler中的handle_event()Ҏ中读取Handle中的log信息?br />4. 接收到的log写入到日志文件、数据库{设备中?br />3.4步骤循环直到当前日志处理完成?br />5. q回到InitiationDispatcher{待下一ơ日志写h?br />
?a >Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events有对Reactor模式的C++的实现版本,多年不用C++Q因而略q?nbsp;

Java NIO对Reactor的实?/h2>在Java的NIO中,对Reactor模式有无~的支持Q即使用Selectorcd装了操作pȝ提供的Synchronous Event Demultiplexer功能。这个Doug Lea已经?a >Scalable IO In Java中有非常深入的解释了Q因而不再赘qͼ另外q篇文章对Doug Lea?a >Scalable IO In Java有一些简单解释,臛_它的代码格式比Doug Lea的PPT要整z一些?br />
需要指出的是,不同q里使用InitiationDispatcher来管理EventHandlerQ在Doug Lea的版本中使用SelectionKey中的Attachment来存储对应的EventHandlerQ因而不需要注册EventHandlerq个步骤Q或者设|Attachment是q里的注册。而且在这文章中QDoug Lea从单U程的Reactor、Acceptor、Handler实现q个模式出发Q演化ؓHandler中的处理逻辑多线E化Q实现类似Proactor模式Q此时所有的IO操作q是单线E的Q因而再演化Z个Main Reactor来处理CONNECT事g(Acceptor)Q而多个Sub Reactor来处理READ、WRITE{事?Handler)Q这些Sub Reactor可以分别再自qU程中执行,从而IO操作也多U程化。这个最后一个模型正是Netty中用的模型。ƈ且在Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events?.5 Determine the Number of Initiation Dispatchers in an Application中也有相应的描述?br />

EventHandler接口定义

对EventHandler的定义有两种设计思\Qsingle-method设计和multi-method设计Q?br />A single-method interfaceQ?/strong>它将Event装成一个Event ObjectQEventHandler只定义一个handle_event(Event event)Ҏ。这U设计的好处是有利于扩展Q可以后来方便的d新的EventcdQ然而在子类的实CQ需要判断不同的Eventcd而再ơ扩展成 不同的处理方法,从这个角度上来说Q它又不利于扩展。另外在Netty3的用过E中Q由于它不停的创建ChannelEventc,因而会引vGC的不E_?br />A multi-method interfaceQ?/strong>q种设计是将不同的Eventcd? EventHandler中定义相应的Ҏ。这U设计就是Netty4中用的{略Q其中一个目的是避免ChannelEvent创徏引v的GC不稳定, 另外一个好处是它可以避免在EventHandler实现时判断不同的Eventcd而有不同的实玎ͼ然而这U设计会l扩展新的Eventcd时带来非? 大的ȝQ因为它需要该接口?br />
关于Netty4对Netty3的改q可以参?a >q里Q?br />
ChannelHandler with no event objectIn 3.x, every I/O operation created a ChannelEvent object. For each read / write, it additionally created a new ChannelBuffer. It simplified the internals of Netty quite a lot because it delegates resource management and buffer pooling to the JVM. However, it often was the root cause of GC pressure and uncertainty which are sometimes observed in a Netty-based application under high load.

4.0 removes event object creation almost completely by replacing the event objects with strongly typed method invocations. 3.x had catch-all event handler methods such as handleUpstream() and handleDownstream(), but this is not the case anymore. Every event type has its own handler method now:

Z么用Reactor模式

归功与Netty和Java NIO对Reactor的宣传,本文慕名而学习的Reactor模式Q因而已l默认Reactorh非常优秀的性能Q然而慕名归慕名Q到q里Q我q是要不得不问自己Reactor模式的好处在哪里Q即Z么要使用q个Reactor模式Q在Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events中是q么说的Q?br />
Reactor Pattern优点

Separation of concerns: The Reactor pattern decouples application-independent demultiplexing and dispatching mechanisms from application-specific hook method functionality. The application-independent mechanisms become reusable components that know how to demultiplex events and dispatch the appropriate hook methods defined by Event Handlers. In contrast, the application-specific functionality in a hook method knows how to perform a particular type of service.

Improve modularity, reusability, and configurability of event-driven applications: The pattern decouples application functionality into separate classes. For instance, there are two separate classes in the logging server: one for establishing connections and another for receiving and processing logging records. This decoupling enables the reuse of the connection establishment class for different types of connection-oriented services (such as file transfer, remote login, and video-on-demand). Therefore, modifying or extending the functionality of the logging server only affects the implementation of the logging handler class.

Improves application portability: The Initiation Dispatcher’s interface can be reused independently of the OS system calls that perform event demultiplexing. These system calls detect and report the occurrence of one or more events that may occur simultaneously on multiple sources of events. Common sources of events may in- clude I/O handles, timers, and synchronization objects. On UNIX platforms, the event demultiplexing system calls are called select and poll [1]. In the Win32 API [16], the WaitForMultipleObjects system call performs event demultiplexing.

Provides coarse-grained concurrency control: The Reactor pattern serializes the invocation of event handlers at the level of event demultiplexing and dispatching within a process or thread. Serialization at the Initiation Dispatcher level often eliminates the need for more complicated synchronization or locking within an application process.

q些貌似是很多模式的共性:解耦、提升复用性、模块化、可UL性、事仉动、细力度的ƈ发控制等Q因而ƈ不能很好的说明什么,特别是它鼓吹的对性能的提升,q里q没有体现出来。当然在q篇文章的开头有描述q另一U直观的实现QThread-Per-ConnectionQ即传统的实玎ͼ提到了这个传l实现的以下问题Q?br />
Thread Per Connection~点

Efficiency: Threading may lead to poor performance due to context switching, synchronization, and data movement [2];

Programming simplicity: Threading may require complex concurrency control schemes;

Portability: Threading is not available on all OS platforms.
对于性能Q它其实是W一点关于Efficiency的描qͼ即线E的切换、同步、数据的Ud会引h能问题。也是说从性能的角度上Q它最大的提升是减少了性能的用,即不需要每个Client对应一个线E。我的理解,其他业务逻辑处理很多时候也会用到相同的U程QIOd操作相对CPU的操作还是要慢很多,即Reactor机制中每ơ读写已l能保证非阻塞读写,q里可以减少一些线E的使用Q但是这减少的线E用对性能有那么大的媄响吗Q答案貌似是肯定的,q篇论文(SEDA: Staged Event-Driven Architecture - An Architecture for Well-Conditioned, Scalable Internet Service)寚w着U程的增长带来性能降低做了一个统计:

在这个统计中Q每个线E从盘中读8KB数据Q每个线E读同一个文Ӟ因而数据本w是~存在操作系l内部的Q即减少IO的媄响;所有线E是事先分配的,不会有线E启动的影响Q所有Q务在试内部产生Q因而不会有|络的媄响。该l计数据q行环境QLinux 2.2.14Q?GB内存Q?-way 500MHz Pentium III。从图中可以看出Q随着U程的增长,吞吐量在U程Cؓ8个左右的时候开始线性下降,q且?4个以后而迅速下降,其相应事件也在线E达?56个后指数上升。即1+1<2Q因为线E切换、同步、数据移动会有性能损失Q线E数增加C定数量时Q这U性能影响效果会更加明显?br />
对于q点Q还可以参?a >C10K ProblemQ用以描q同时有10K个Client发vq接的问题,?010q的时候已l出?0M Problem了?br />
当然也有Q?a >Threads are expensive are no longer valid.在不久的来可能又会发生不同的变化,或者这个变化正在、已l发生着Q没有做q比较仔l的试Q因而不敢随便断a什么,然而本点,即ɾU程变的影响q没有以前那么大Q用Reactor模式Q甚xSEDA模式来减线E的使用Q再加上其他解耦、模块化、提升复用性等优点Q还是值得使用的?br />

Reactor模式的缺?/h2>Reactor模式的缺点貌g是显而易见的Q?br />1. 相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛Qƈ且不易于调试?br />2. Reactor模式需要底层的Synchronous Event Demultiplexer支持Q比如Java中的Selector支持Q操作系l的selectpȝ调用支持Q如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效?br />3. Reactor模式在IOd数据时还是在同一个线E中实现的,即使用多个Reactor机制的情况下Q那些共享一个Reactor的Channel如果出现一个长旉的数据读写,会媄响这个Reactor中其他Channel的相应时_比如在大文g传输ӞIO操作׃影响其他Client的相应时_因而对q种操作Q用传l的Thread-Per-Connection或许是一个更好的选择Q或则此时用Proactor模式?br />

参?/h2> Reactor Pattern WikiPedia
Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events
Scalable IO In Java
C10K Problem WikiPedia


DLevin 2015-09-02 15:14 发表评论
]]>深入HBase架构解析Q二Q?/title><link>http://www.tkk7.com/DLevin/archive/2015/08/22/426950.html</link><dc:creator>DLevin</dc:creator><author>DLevin</author><pubDate>Sat, 22 Aug 2015 11:40:00 GMT</pubDate><guid>http://www.tkk7.com/DLevin/archive/2015/08/22/426950.html</guid><wfw:comment>http://www.tkk7.com/DLevin/comments/426950.html</wfw:comment><comments>http://www.tkk7.com/DLevin/archive/2015/08/22/426950.html#Feedback</comments><slash:comments>0</slash:comments><wfw:commentRss>http://www.tkk7.com/DLevin/comments/commentRss/426950.html</wfw:commentRss><trackback:ping>http://www.tkk7.com/DLevin/services/trackbacks/426950.html</trackback:ping><description><![CDATA[<h2> 前言</h2>q是<a href="http://www.tkk7.com/DLevin/archive/2015/08/22/426877.html">《深入HBase架构解析Q一Q?/a>的箋Q不多废话,l箋。。。?br /><h2>HBaseȝ实现</h2>通过前文的描qͼ我们知道在HBase写时Q相同Cell(RowKey/ColumnFamily/Column相同)q不保证在一P甚至删除一个Cell也只是写入一个新的CellQ它含有Delete标记Q而不一定将一个Cell真正删除了,因而这引起了一个问题,如何实现ȝ问题Q要解决q个问题Q我们先来分析一下相同的Cell可能存在的位|:首先Ҏ写入的CellQ它会存在于MemStore中;然后对之前已lFlush到HDFS中的CellQ它会存在于某个或某些StoreFile(HFile)中;最后,对刚dq的CellQ它可能存在于BlockCache中。既然相同的Cell可能存储在三个地方,在读取的时候只需要扫瞄这三个地方Q然后将l果合ƈ卛_(Merge Read)Q在HBase中扫瞄的序依次是:BlockCache、MemStore、StoreFile(HFile)。其中StoreFile的扫瞄先会用Bloom Filterqo那些不可能符合条件的HFileQ然后用Block Index快速定位CellQƈ其加蝲到BlockCache中,然后从BlockCache中读取。我们知道一个HStore可能存在多个StoreFile(HFile)Q此旉要扫瞄多个HFileQ如果HFileq多又是会引h能问题?br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig16.png" height="278" width="769" /><br /><h2>Compaction</h2>MemStore每次Flush会创建新的HFileQ而过多的HFile会引赯的性能问题Q那么如何解册个问题呢QHBase采用Compaction机制来解册个问题,有点cMJava中的GC机制Qv初Java不停的申请内存而不释放Q增加性能Q然而天下没有免费的午餐Q最l我们还是要在某个条件下L集垃圾,很多时候需要Stop-The-WorldQ这UStop-The-World有些时候也会引起很大的问题Q比如参考本人写?a href="http://www.tkk7.com/DLevin/archive/2015/08/01/426418.html">q篇文章</a>Q因而设计是一U权衡,没有完美的。还是类似Java中的GCQ在HBase中Compaction分ؓ两种QMinor Compaction和Major Compaction?br /><ol><li>Minor Compaction是指选取一些小的、相ȝStoreFile他们合q成一个更大的StoreFileQ在q个q程中不会处理已lDeleted或Expired的Cell。一ơMinor Compaction的结果是更少q且更大的StoreFile。(q个是对的吗QBigTable中是q样描述Minor Compaction?span style="font-size: 10.000000pt; font-family: 'Times'">QAs write operations execute, the size of the memtable in- creases. When the memtable size reaches a threshold, the memtable is frozen, a new memtable is created, and the frozen memtable is converted to an SSTable and written to GFS. This </span><span style="font-size: 10.000000pt; font-family: 'Times'; font-style: italic">minor compaction </span><span style="font-size: 10.000000pt; font-family: 'Times'">process has two goals: it shrinks the memory usage of the tablet server, and it reduces the amount of data that has to be read from the commit log during recovery if this server dies. Incom- ing read and write operations can continue while com- pactions occur. </span>也就是说它将memtable的数据flush的一个HFile/SSTableUCؓ一ơMinor CompactionQ?/li><li>Major Compaction是指所有的StoreFile合ƈ成一个StoreFileQ在q个q程中,标记为Deleted的Cell会被删除Q而那些已lExpired的Cell会被丢弃Q那些已l超q最多版本数的Cell会被丢弃。一ơMajor Compaction的结果是一个HStore只有一个StoreFile存在。Major Compaction可以手动或自动触发,然而由于它会引起很多的IO操作而引h能问题Q因而它一般会被安排在周末、凌晨等集群比较闲的旉?br /></li></ol>更Ş象一点,如下面两张图分别表示Minor Compaction和Major Compaction?br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig18.png" height="329" width="723" /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig19.png" height="339" width="653" /><br /><h2>HRegion Split</h2>最初,一个Table只有一个HRegionQ随着数据写入增加Q如果一个HRegion到达一定的大小Q就需要Split成两个HRegionQ这个大由hbase.hregion.max.filesize指定Q默认ؓ10GB。当splitӞ两个新的HRegion会在同一个HRegionServer中创建,它们各自包含父HRegion一半的数据Q当Split完成后,父HRegion会下U,而新的两个子HRegion会向HMaster注册上线Q处于负载均衡的考虑Q这两个新的HRegion可能会被HMaster分配到其他的HRegionServer中。关于Split的详l信息,可以参考这文章:<a >《Apache HBase Region Splitting and Merging?/a>?br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig21.png" height="361" width="675" /><br /><h2>HRegion负蝲均衡</h2>在HRegion Split后,两个新的HRegion最初会和之前的父HRegion在相同的HRegionServer上,Z负蝲均衡的考虑QHMaster可能会将其中的一个甚至两个重新分配的其他的HRegionServer中,此时会引h些HRegionServer处理的数据在其他节点上,直到下一ơMajor Compaction数据从q端的节点移动到本地节点?br /><br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig22.png" height="358" width="714" /><br /><h2>HRegionServer Recovery</h2>当一台HRegionServer宕机Ӟ׃它不再发送HeartbeatlZooKeeper而被监测刎ͼ此时ZooKeeper会通知HMasterQHMaster会检到哪台HRegionServer宕机Q它宕机的HRegionServer中的HRegion重新分配l其他的HRegionServerQ同时HMaster会把宕机的HRegionServer相关的WAL拆分分配l相应的HRegionServer(拆分出的WAL文g写入对应的目的HRegionServer的WAL目录中,qƈ写入对应的DataNode中)Q从而这些HRegionServer可以Replay分到的WAL来重建MemStore?br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig25.png" height="368" width="708" /><br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig26.png" height="378" width="724" /><br /><h2>HBase架构单ȝ</h2>在NoSQL中,存在著名的CAP理论Q即Consistency、Availability、Partition Tolerance不可全得Q目前市Z基本上的NoSQL都采用Partition Tolerance以实现数据得水^扩展Q来处理Relational DataBase遇到的无法处理数据量太大的问题,或引L性能问题。因而只有剩下C和A可以选择。HBase在两者之间选择了ConsistencyQ然后用多个HMaster以及支持HRegionServer的failure监控、ZooKeeper引入作ؓ协调者等各种手段来解决Availability问题Q然而当|络的Split-Brain(Network Partition)发生Ӟ它还是无法完全解决Availability的问题。从q个角度上,Cassandra选择了AQ即它在|络Split-Brain时还是能正常写,而用其他技术来解决Consistency的问题,如读的时候触发Consistency判断和处理。这是设计上的限制?br /><br />从实C的优点:<br /><ol><li>HBase采用Z致性模型,在一个写q回后,保证所有的读都d相同的数据?/li><li>通过HRegion动态Split和Merge实现自动扩展Qƈ使用HDFS提供的多个数据备份功能,实现高可用性?/li><li>采用HRegionServer和DataNodeq行在相同的服务器上实现数据的本地化Q提升读写性能Qƈ减少|络压力?/li><li>内徏HRegionServer的宕动恢复。采用WAL来Replayq未持久化到HDFS的数据?/li><li>可以无缝的和Hadoop/MapReduce集成?br /></li></ol>实现上的~点Q?br /><ol><li>WAL的Replayq程可能会很慢?/li><li>N恢复比较复杂Q也会比较慢?/li><li>Major Compaction会引起IO Storm?/li><li>。。。?br /></li></ol><h2>参考:</h2> https://www.mapr.com/blog/in-depth-look-hbase-architecture#.VdNSN6Yp3qx<br /> http://jimbojw.com/wiki/index.php?title=Understanding_Hbase_and_BigTable<br /> http://hbase.apache.org/book.html <br /> http://www.searchtb.com/2011/01/understanding-hbase.html <br /> http://research.google.com/archive/bigtable-osdi06.pdf<img src ="http://www.tkk7.com/DLevin/aggbug/426950.html" width = "1" height = "1" /><br><br><div align=right><a style="text-decoration:none;" href="http://www.tkk7.com/DLevin/" target="_blank">DLevin</a> 2015-08-22 19:40 <a href="http://www.tkk7.com/DLevin/archive/2015/08/22/426950.html#Feedback" target="_blank" style="text-decoration:none;">发表评论</a></div>]]></description></item><item><title>深入HBase架构解析Q一Q?/title><link>http://www.tkk7.com/DLevin/archive/2015/08/22/426877.html</link><dc:creator>DLevin</dc:creator><author>DLevin</author><pubDate>Sat, 22 Aug 2015 09:44:00 GMT</pubDate><guid>http://www.tkk7.com/DLevin/archive/2015/08/22/426877.html</guid><wfw:comment>http://www.tkk7.com/DLevin/comments/426877.html</wfw:comment><comments>http://www.tkk7.com/DLevin/archive/2015/08/22/426877.html#Feedback</comments><slash:comments>0</slash:comments><wfw:commentRss>http://www.tkk7.com/DLevin/comments/commentRss/426877.html</wfw:commentRss><trackback:ping>http://www.tkk7.com/DLevin/services/trackbacks/426877.html</trackback:ping><description><![CDATA[<h2>前记</h2> 公司内部使用的是MapR版本的Hadoop生态系l,因而从MapR的官|看Cq篇文文章:<a >An In-Depth Look at the HBase Architecture</a>Q原本想译全文Q然而如果翻译就需要各U咬文嚼字,太麻烦,因而本文大部分使用了自q语言Qƈ且加入了其他资源的参考理解以及本p源码时对其的理解Q属于半译、半原创吧?br /> <h2>HBase架构l成</h2> HBase采用Master/Slave架构搭徏集群Q它隶属于Hadoop生态系l,׃下类型节点组成:HMaster节点、HRegionServer节点、ZooKeeper集群Q而在底层Q它数据存储于HDFS中,因而涉及到HDFS的NameNode、DataNode{,Ml构如下Q?br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArch1.jpg" height="389" width="603" /><br /> 其中<strong>HMaster节点</strong>用于Q?br /> <ol> <li>理HRegionServerQ实现其负蝲均衡?/li> <li>理和分配HRegionQ比如在HRegion split时分配新的HRegionQ在HRegionServer退出时q移其内的HRegion到其他HRegionServer上?/li> <li>实现DDL操作QData Definition LanguageQnamespace和table的增删改Qcolumn familiy的增删改{)?/li> <li>理namespace和table的元数据Q实际存储在HDFS上)?/li> <li>权限控制QACLQ?/li> </ol> <strong>HRegionServer节点</strong>用于Q?br /> <ol> <li>存放和管理本地HRegion?/li> <li>dHDFSQ管理Table中的数据?/li> <li>Client直接通过HRegionServerd数据Q从HMaster中获取元数据Q找到RowKey所在的HRegion/HRegionServer后)?/li> </ol> <strong>ZooKeeper集群是协调系l?/strong>Q用于:<br /> <ol> <li>存放整个 HBase集群的元数据以及集群的状态信息?/li> <li>实现HMasterM节点的failover?/li> </ol> HBase Client通过RPC方式和HMaster、HRegionServer通信Q一个HRegionServer可以存放1000个HRegionQ底层Table数据存储于HDFS中,而HRegion所处理的数据尽量和数据所在的DataNode在一P实现数据的本地化Q数据本地化q不是总能实现Q比如在HRegionUd(如因Split)Ӟ需要等下一ơCompact才能l箋回到本地化?br /> <br /> 本着半翻译的原则Q再贴一个《An In-Depth Look At The HBase Architecture》的架构图:<br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig1.png" height="343" width="632" /><br /> q个架构图比较清晰的表达了HMaster和NameNode都支持多个热备䆾Q用ZooKeeper来做协调QZooKeeperq不是云般神U,它一般由三台机器l成一个集,内部使用PAXOS法支持三台Server中的一台宕机,也有使用五台机器的,此时则可以支持同时两台宕机,既少于半数的宕机Q然而随着机器的增加,它的性能也会下降QRegionServer和DataNode一般会攑֜相同的Server上实现数据的本地化?br /> <h2>HRegion</h2> HBase使用RowKey表水^切割成多个HRegionQ从HMaster的角度,每个HRegion都纪录了它的StartKey和EndKeyQ第一个HRegion的StartKey为空Q最后一个HRegion的EndKey为空Q,׃RowKey是排序的Q因而Client可以通过HMaster快速的定位每个RowKey在哪个HRegion中。HRegion由HMaster分配到相应的HRegionServer中,然后由HRegionServer负责HRegion的启动和理Q和Client的通信Q负责数据的?使用HDFS)。每个HRegionServer可以同时理1000个左右的HRegionQ这个数字怎么来的Q没有从代码中看到限ӞN是出于经验?过1000个会引v性能问题Q?strong>来回{这个问?/strong>Q感觉这?000的数字是从BigTable的论文中来的Q? Implementation节)QEach tablet server manages a set of tablets(typically we have somewhere between ten to a thousand tablets per tablet server)Q?br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig2.png" height="337" width="724" /><br /> <h2>HMaster</h2> HMaster没有单点故障问题Q可以启动多个HMasterQ通过ZooKeeper的Master Election机制保证同时只有一个HMasterZActive状态,其他的HMaster则处于热备䆾状态。一般情况下会启动两个HMasterQ非Active的HMaster会定期的和Active HMaster通信以获取其最新状态,从而保证它是实时更新的Q因而如果启动了多个HMaster反而增加了Active HMaster的负担。前文已l介l过了HMaster的主要用于HRegion的分配和理QDDL(Data Definition LanguageQ既Table的新建、删除、修改等)的实现等Q既它主要有两方面的职责Q?br /> <ol> <li>协调HRegionServer <ol> <li>启动时HRegion的分配,以及负蝲均衡和修复时HRegion的重新分配?/li> <li>监控集群中所有HRegionServer的状?通过Heartbeat和监听ZooKeeper中的状??br /> </li> </ol> </li> <li>Admin职能 <ol> <li>创徏、删除、修改Table的定义?br /> </li> </ol> </li> </ol> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig3.png" /><br /> <h2> ZooKeeperQ协调?/h2> ZooKeeper为HBase集群提供协调服务Q它理着HMaster和HRegionServer的状?available/alive{?Qƈ且会在它们宕机时通知lHMasterQ从而HMaster可以实现HMaster之间的failoverQ或对宕机的HRegionServer中的HRegion集合的修?它们分配给其他的HRegionServer)。ZooKeeper集群本n使用一致性协?PAXOS协议)保证每个节点状态的一致性?br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig4.png" height="318" width="703" /><br /> <h2>How The Components Work Together</h2> ZooKeeper协调集群所有节点的׃n信息Q在HMaster和HRegionServerq接到ZooKeeper后创建Ephemeral节点Qƈ使用Heartbeat机制l持q个节点的存zȝ态,如果某个Ephemeral节点实效Q则HMaster会收到通知Qƈ做相应的处理?br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig5.png" height="329" width="722" /><br /> 另外QHMaster通过监听ZooKeeper中的Ephemeral节点(默认Q?hbase/rs/*)来监控HRegionServer的加入和宕机。在W一个HMasterq接到ZooKeeper时会创徏Ephemeral节点(默认Q?hbasae/master)来表CActive的HMasterQ其后加q来的HMaster则监听该Ephemeral节点Q如果当前Active的HMaster宕机Q则该节Ҏ失,因而其他HMaster得到通知Q而将自n转换成Active的HMasterQ在变ؓActive的HMaster之前Q它会创建在/hbase/back-masters/下创qEphemeral节点?br /> <h3> HBase的第一ơ读?/h3> 在HBase 0.96以前QHBase有两个特D的TableQ?ROOT-?META.Q如<a >BigTable</a>中的设计Q,其中-ROOT- Table的位|存储在ZooKeeperQ它存储?META. Table的RegionInfo信息Qƈ且它只能存在一个HRegionQ?META. Table则存储了用户Table的RegionInfo信息Q它可以被切分成多个HRegionQ因而对W一ơ访问用户TableӞ首先从ZooKeeper中读?ROOT- Table所在HRegionServerQ然后从该HRegionServer中根据请求的TableNameQRowKeyd.META. Table所在HRegionServerQ最后从该HRegionServer中读?META. Table的内容而获取此ơ请求需要访问的HRegion所在的位置Q然后访问该HRegionSever获取h的数据,q需要三ơ请求才能找到用户Table所在的位置Q然后第四次h开始获取真正的数据。当然ؓ了提升性能Q客L会缓?ROOT- Table位置以及-ROOT-/.META. Table的内宏V如下图所C:<br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/image0030.jpg" height="228" width="399" /><br /> 可是即客户端有~存Q在初始阶段需要三ơ请求才能直到用户Table真正所在的位置也是性能低下的,而且真的有必要支持那么多的HRegion吗?或许对Googleq样的公司来说是需要的Q但是对一般的集群来说好像q没有这个必要。在BigTable的论文中_每行METADATA存储1KB左右数据Q中{大的Tablet(HRegion)?28MB左右Q?层位|的Schema设计可以支持2^34个Tablet(HRegion)。即使去?ROOT- TableQ也q可以支?^17(131072)个HRegionQ?如果每个HRegionq是128MBQ那是16TBQ这个貌g够大Q但是现在的HRegion的最大大都会设|的比较大,比如我们讄?GBQ此时支持的大小则变成了4PBQ对一般的集群来说已经够了Q因而在HBase 0.96以后L?ROOT- TableQ只剩下q个Ҏ的目录表叫做Meta Table(hbase:meta)Q它存储了集中所有用户HRegion的位|信息,而ZooKeeper的节点中(/hbase/meta-region-server)存储的则直接是这个Meta Table的位|,q且q个Meta Table如以前的-ROOT- Table一h不可split的。这P客户端在W一ơ访问用户Table的流E就变成了:<br /> <ol> <li>从ZooKeeper(/hbase/meta-region-server)中获取hbase:meta的位|(HRegionServer的位|)Q缓存该位置信息?/li> <li>从HRegionServer中查询用户Table对应h的RowKey所在的HRegionServerQ缓存该位置信息?/li> <li>从查询到HRegionServer中读取Row?/li> </ol> 从这个过E中Q我们发现客户会~存q些位置信息Q然而第二步它只是缓存当前RowKey对应的HRegion的位|,因而如果下一个要查的RowKey不在同一个HRegion中,则需要l查询hbase:meta所在的HRegionQ然而随着旉的推U,客户端缓存的位置信息来多Q以至于不需要再ơ查找hbase:meta Table的信息,除非某个HRegion因ؓ宕机或Split被移动,此时需要重新查询ƈ且更新缓存?br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig6.png" height="356" width="590" /><br /> <h3> hbase:meta?/h3> hbase:meta表存储了所有用户HRegion的位|信息,它的RowKey是:tableName,regionStartKey,regionId,replicaId{,它只有info列族Q这个列族包含三个列Q他们分别是Qinfo:regioninfo列是RegionInfo的proto格式QregionId,tableName,startKey,endKey,offline,split,replicaIdQinfo:server格式QHRegionServer对应的server:portQinfo:serverstartcode格式是HRegionServer的启动时间戳?br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig7.png" height="362" width="736" /><br /> <h2>HRegionServer详解</h2> HRegionServer一般和DataNode在同一台机器上q行Q实现数据的本地性。HRegionServer包含多个HRegionQ由WAL(HLog)、BlockCache、MemStore、HFilel成?br /> <ol> <li><strong>WAL即Write Ahead Log</strong>Q在早期版本中称为HLogQ它是HDFS上的一个文Ӟ如其名字所表示的,所有写操作都会先保证将数据写入q个Log文g后,才会真正更新MemStoreQ最后写入HFile中。采用这U模式,可以保证HRegionServer宕机后,我们依然可以从该Log文g中读取数据,Replay所有的操作Q而不至于数据丢失。这个Log文g会定期Roll出新的文件而删除旧的文?那些已持久化到HFile中的Log可以删除)。WAL文g存储?hbase/WALs/${HRegionServer_Name}的目录中(?.94之前Q存储在/hbase/.logs/目录?Q一般一个HRegionServer只有一个WAL实例Q也是说一个HRegionServer的所有WAL写都是串行的(像log4j的日志写也是串行?Q这当然会引h能问题Q因而在HBase 1.0之后Q通过<a >HBASE-5699</a>实现了多个WALq行?MultiWAL)Q该实现采用HDFS的多个管道写Q以单个HRegion为单位。关于WAL可以参考Wikipedia?a >Write-Ahead Logging</a>。顺便吐槽一句,英文版的l基癄竟然能毫无压力的正常讉K了,q是某个GFW的疏忽还是以后的常态?</li> <li><strong>BlockCache是一个读~存</strong>Q即“引用局部?#8221;原理Q也应用于CPUQ?a >分空间局部性和旉局部?/a>Q空间局部性是指CPU在某一时刻需要某个数据,那么有很大的概率在一下时d需要的数据在其附近Q时间局部性是指某个数据在被访问过一ơ后Q它有很大的概率在不久的来会被再次的访问)Q将数据预读取到内存中,以提升读的性能。HBase中提供两UBlockCache的实玎ͼ默认on-heap LruBlockCache和BucketCache(通常是off-heap)。通常BucketCache的性能要差于LruBlockCacheQ然而由于GC的媄响,LruBlockCache的gq会变的不稳定,而BucketCache׃是自q理BlockCacheQ而不需要GCQ因而它的gq通常比较E_Q这也是有些时候需要选用BucketCache的原因。这文?a >BlockCache101</a>对on-heap和off-heap的BlockCache做了详细的比较?/li><strong> </strong><li><strong>HRegion是一个Table中的一个Region在一个HRegionServer中的表达</strong>。一个Table可以有一个或多个RegionQ他们可以在一个相同的HRegionServer上,也可以分布在不同的HRegionServer上,一个HRegionServer可以有多个HRegionQ他们分别属于不同的Table。HRegion由多个Store(HStore)构成Q每个HStore对应了一个Table在这个HRegion中的一个Column FamilyQ即每个Column Family是一个集中的存储单元Q因而最好将h相近IOҎ的Column存储在一个Column FamilyQ以实现高效d(数据局部性原理,可以提高~存的命中率)。HStore是HBase中存储的核心Q它实现了读写HDFS功能Q一个HStore׃个MemStore ?个或多个StoreFilel成?br /> <ol> <li><strong>MemStore是一个写~存</strong>(In Memory Sorted Buffer)Q所有数据的写在完成WAL日志写后Q会 写入MemStore中,由MemStoreҎ一定的法数据Flush到地层HDFS文g?HFile)Q通常每个HRegion中的每个 Column Family有一个自qMemStore?/li> <li><strong>HFile(StoreFile) 用于存储HBase的数?Cell/KeyValue)</strong>。在HFile中的数据是按RowKey、Column Family、Column排序Q对相同的Cell(卌三个值都一?Q则按timestamp倒序排列?/li> </ol> </li> </ol> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig8.png" /><br /> 虽然上面q张囑ֱ现的是最新的HRegionServer的架?但是q不是那么的_)Q但是我一直比较喜Ƣ看以下q张图,即它展现的应该?.94以前的架构?br /> <img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/image0060.jpg" height="347" width="553" /><br /> <h3> HRegionServer中数据写程图解</h3> 当客L发v一个PuthӞ首先它从hbase:meta表中查出该Put数据最l需要去的HRegionServer。然后客LPuth发送给相应的HRegionServerQ在HRegionServer中它首先会将该Put操作写入WAL日志文g?Flush到磁盘中)?br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig9.png" height="363" width="716" /><br /> 写完WAL日志文g后,HRegionServerҎPut中的TableName和RowKey扑ֈ对应的HRegionQƈҎColumn Family扑ֈ对应的HStoreQƈPut写入到该HStore的MemStore中。此时写成功Qƈq回通知客户端?br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig10.png" height="298" width="664" /><br /><h3>MemStore Flush<br /></h3>MemStore是一个In Memory Sorted BufferQ在每个HStore中都有一个MemStoreQ即它是一个HRegion的一个Column Family对应一个实例。它的排列顺序以RowKey、Column Family、Column的顺序以及Timestamp的倒序Q如下所C:<br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig11.png" height="351" width="719" /><br />每一ơPut/Deleteh都是先写入到MemStore中,当MemStore满后会Flush成一个新的StoreFile(底层实现是HFile)Q即一个HStore(Column Family)可以?个或多个StoreFile(HFile)。有以下三种情况可以触发MemStore的Flush动作Q?strong>需要注意的是MemStore的最Flush单元是HRegion而不是单个MemStore</strong>。据说这是Column Family有个数限制的其中一个原因,估计是因为太多的Column Family一起Flush会引h能问题Q具体原因有待考证?br /><ol><li>当一个HRegion中的所有MemStore的大d过了hbase.hregion.memstore.flush.size的大,默认128MB。此时当前的HRegion中所有的MemStore会Flush到HDFS中?/li><li>当全局MemStore的大超q了hbase.regionserver.global.memstore.upperLimit的大,默认40Q的内存使用量。此时当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush序是MemStore大小的倒序Q一个HRegion中所有MemStored作ؓ该HRegion的MemStore的大还是选取最大的MemStore作ؓ参考?有待考证Q,直到M的MemStore使用量低于hbase.regionserver.global.memstore.lowerLimitQ默?8%的内存用量?/li><li>当前HRegionServer中WAL的大超q了hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs的数量,当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush使用旉序Q最早的MemStore先Flush直到WAL的数量少于hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs?a >q里</a>说这两个怹的默认大是2GBQ查代码Qhbase.regionserver.max.logs默认值是32Q而hbase.regionserver.hlog.blocksize是HDFS的默认blocksizeQ?2MB。但不管怎么P因ؓq个大小过限制引v的Flush不是一件好事,可能引v长时间的延迟Q因而这文章给的徏议:“<strong style="color: #339966; font-family: STHeiti; font-size: medium; font-style: normal; font-variant: normal; letter-spacing: normal; line-height: normal; orphans: auto; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 1; word-spacing: 0px; -webkit-text-stroke-width: 0px;">Hint</strong><span style="color: #339966; font-family: STHeiti; font-size: medium; font-style: normal; font-variant: normal; font-weight: normal; letter-spacing: normal; line-height: normal; orphans: auto; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 1; word-spacing: 0px; -webkit-text-stroke-width: 0px; display: inline !important; float: none;">: keep hbase.regionserver.hlog.blocksize * hbase.regionserver.maxlogs just a bit above hbase.regionserver.global.memstore.lowerLimit * HBASE_HEAPSIZE.</span>”。ƈ且需要注意,<a >q里</a>l的描述是有错的(虽然它是官方的文??br /></li></ol>在MemStore Flushq程中,q会在尾部追加一些meta数据Q其中就包括Flush时最大的WAL sequence|以告诉HBaseq个StoreFile写入的最新数据的序列Q那么在Recover时就直到从哪里开始。在HRegion启动Ӟq个sequence会被dQƈ取最大的作ؓ下一ơ更新时的v始sequence?br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig12.png" height="248" width="622" /><br /><h2> HFile格式</h2>HBase的数据以KeyValue(Cell)的Ş式顺序的存储在HFile中,在MemStore的Flushq程中生成HFileQ由于MemStore中存储的Cell遵@相同的排列顺序,因而Flushq程是顺序写Q我们直到磁盘的序写性能很高Q因Z需要不停的Ud盘指针?br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig13.png" height="351" width="698" /><br />HFile参考BigTable的SSTable和Hadoop?a >TFile</a>实现Q从HBase开始到现在QHFilel历了三个版本,其中V2?.92引入QV3?.98引入。首先我们来看一下V1的格式:<br /><img src="http://www.tkk7.com/images/blogjava_net/dlevin/image0080.jpg" alt="" height="160" border="0" width="554" /><br />V1的HFile由多个Data Block、Meta Block、FileInfo、Data Index、Meta Index、Trailerl成Q其中Data Block是HBase的最存储单元,在前文中提到的BlockCache是ZData Block的缓存的。一个Data Block׃个魔数和一pd的KeyValue(Cell)l成Q魔数是一个随机的数字Q用于表C是一个Data BlockcdQ以快速监这个Data Block的格式,防止数据的破坏。Data Block的大可以在创徏Column Family时设|?HColumnDescriptor.setBlockSize())Q默认值是64KBQ大LBlock有利于顺序ScanQ小号Block利于随机查询Q因而需要权衡。Meta块是可选的QFileInfo是固定长度的块,它纪录了文g的一些Meta信息Q例如:AVG_KEY_LEN, AVG_VALUE_LEN, LAST_KEY, COMPARATOR, MAX_SEQ_ID_KEY{。Data Index和Meta IndexU录了每个Data块和Meta块的其实炏V未压羃时大、Key(起始RowKeyQ?{。TrailerU录了FileInfo、Data Index、Meta Index块的起始位置QData Index和Meta Index索引的数量等。其中FileInfo和Trailer是固定长度的?br /><br />HFile里面的每个KeyValue对就是一个简单的byte数组。但是这个byte数组里面包含了很多项Qƈ且有固定的结构。我们来看看里面的具体结构:<br /><img src="http://www.tkk7.com/images/blogjava_net/dlevin/image0090.jpg" alt="" height="93" border="0" width="553" /><br />开始是两个固定长度的数|分别表示Key的长度和Value的长度。紧接着是KeyQ开始是固定长度的数|表示RowKey的长度,紧接着? RowKeyQ然后是固定长度的数|表示Family的长度,然后是FamilyQ接着是QualifierQ然后是两个固定长度的数|表示Time Stamp和Key TypeQPut/DeleteQ。Value部分没有q么复杂的结构,是Ua的二q制数据了?strong>随着HFile版本q移QKeyValue(Cell)的格式ƈ未发生太多变化,只是在V3版本Q尾部添加了一个可选的Tag数组</strong>?br /> <br />HFileV1版本的在实际使用q程中发现它占用内存多,q且Bloom File和Block Index会变的很大,而引起启动时间变ѝ其中每个HFile的Bloom Filter可以增长?00MBQ这在查询时会引h能问题Q因为每ơ查询时需要加载ƈ查询Bloom FilterQ?00MB的Bloom Filer会引起很大的延迟Q另一个,Block Index在一个HRegionServer可能会增长到d6GBQHRegionServer在启动时需要先加蝲所有这些Block IndexQ因而增加了启动旉。ؓ了解册些问题,?.92版本中引入HFileV2版本Q?br /><img src="http://www.tkk7.com/images/blogjava_net/dlevin/hfilev2.png" alt="" height="418" border="0" width="566" /><br />在这个版本中QBlock Index和Bloom FilterdCData Block中间Q而这U设计同时也减少了写的内存用量Q另外,Z提升启动速度Q在q个版本中还引入了gq读的功能,卛_HFile真正被用时才对其进行解析?br /><br />FileV3版本基本和V2版本相比Qƈ没有太大的改变,它在KeyValue(Cell)层面上添加了Tag数组的支持;q在FileInfol构中添加了和Tag相关的两个字Dc关于具体HFile格式演化介绍Q可以参?a >q里</a>?br /><br />对HFileV2格式具体分析Q它是一个多层的cB+树烦引,采用q种设计Q可以实现查找不需要读取整个文Ӟ<br /><img alt="" src="http://www.tkk7.com/images/blogjava_net/dlevin/HBaseArchitecture-Blog-Fig14.png" height="349" width="688" /><br />Data Block中的Cell都是升序排列Q每个block都有它自qLeaf-IndexQ每个Block的最后一个Key被放入Intermediate-Index中,Root-Index指向Intermediate-Index。在HFile的末还有Bloom Filter用于快速定位那么没有在某个Data Block中的RowQTimeRange信息用于l那些用时间查询的参考。在HFile打开Ӟq些索引信息都被加蝲q保存在内存中,以增加以后的d性能?br /><br />q篇先写到q里Q未完待l。。。?br /><br /> <h2>参考:</h2> https://www.mapr.com/blog/in-depth-look-hbase-architecture#.VdNSN6Yp3qx<br /> http://jimbojw.com/wiki/index.php?title=Understanding_Hbase_and_BigTable<br /> http://hbase.apache.org/book.html <br /> http://www.searchtb.com/2011/01/understanding-hbase.html <br /> http://research.google.com/archive/bigtable-osdi06.pdf<img src ="http://www.tkk7.com/DLevin/aggbug/426877.html" width = "1" height = "1" /><br><br><div align=right><a style="text-decoration:none;" href="http://www.tkk7.com/DLevin/" target="_blank">DLevin</a> 2015-08-22 17:44 <a href="http://www.tkk7.com/DLevin/archive/2015/08/22/426877.html#Feedback" target="_blank" style="text-decoration:none;">发表评论</a></div>]]></description></item><item><title>Log4J引v的程序“装歠Z?/title><link>http://www.tkk7.com/DLevin/archive/2015/08/13/426751.html</link><dc:creator>DLevin</dc:creator><author>DLevin</author><pubDate>Thu, 13 Aug 2015 08:28:00 GMT</pubDate><guid>http://www.tkk7.com/DLevin/archive/2015/08/13/426751.html</guid><wfw:comment>http://www.tkk7.com/DLevin/comments/426751.html</wfw:comment><comments>http://www.tkk7.com/DLevin/archive/2015/08/13/426751.html#Feedback</comments><slash:comments>4</slash:comments><wfw:commentRss>http://www.tkk7.com/DLevin/comments/commentRss/426751.html</wfw:commentRss><trackback:ping>http://www.tkk7.com/DLevin/services/trackbacks/426751.html</trackback:ping><description><![CDATA[<h2>问题起因</h2> 依然是在使用GemFire的集中Q我们发现偶会出现一些GemFire的Function执行特别慢,q且过了两分钟Qؓ了保证数据的一致性,我们在写之前需要先拿一个LockQ因Z能每个Key都对应一个LockQ因而我们用了Guava的Stripe LockQ关于Stripe Lock可以参?a href="http://www.tkk7.com/DLevin/archive/2013/12/25/407990.html">q里</a>Q,而且q个Lock本n我们指定?分钟的超时时_因而如果写过两分钟,我们׃收到ExceptionQ。这个问题其实已l困C我们好几q了Q刚前段旉Q我们发现长旉的Stop-The-World GC会引赯个问题,而且q种时候很多时候会引v那个节点从集中退出,q不是所有的q种错误都有GC的问题,我特地查了GC的日志,有些q种写超q两分钟的情况下QGC一直处于非常健L状态,而且查了GemFire的日志和我们自己的日志,也没有发CQ何异常。由于我们每个数据保留两分䆾拯Q也是说每ơ数据写都要写两个节点,两分钟对CPU来说可以做太多的事情Q因而只有IO才能在某些时候生这U问题,在问题发生的时候也没有Moverflow数据Q而且本地操作Q即使对IO来说2分钟也是一个非帔R的时间了Q因而我们只能怀疑这是写另一个节点引LQ对另一个节点,它是在同一个Data Center中,而且基本是在同一个Chasis内部Q因而它们之间小?M的数据量通信也不太可能花?分钟的时_所以剩下的我们只能怀疑网l的问题了,比如数据丢包、网l抖动、网l流量太大一起传输变慢等Q但是我们没有找CQ何相关的问题。所以我们很长一D|间素手无{,只能怪GemFire闭源Q我们不知道q两分钟是不是GemFire自己内部在做一些不Zh知的事情Q因而太忙了而每来得及处理我们的写请求。虽然我一直觉得不在处理什么炒作,两分钟都没有响应Ҏ无法解释的通,更何况GemFire节点之间q没有报告有M异常Q或者像以前发现的一个节点向Locator举报另一个节Ҏ有响应的问题QLocator自己也能很正常的向那个节点发送新的成员信息(ViewQ,因而看h向是q个节点虽然׃两分钟多来写一个数据,但是它还是有响应的,有点“假死”的赶脚?br /> <br /> <h2>问题发现</h2> q个问题q么几年以来时不时的׃发生Q而且因ؓ以前q旉太多了,而且也没有找CQ何出错的地方Q现在烦性不去花太多旉在上面了Q更何况q个它很长时间才发生一ơ,q且今年以来׃直没发生q,直到前几周出Cơ,我有点不信邪的重新去看这个问题,依然没有扑ֈM可疑的地方,GC日志、应用程序日志、GemFire自己的日志、网l、CPU使用情况{所有的都是正常的,除了问题发生的那个时刻,应用E序没有M日志Q另外在问题发生之前出现qLog4J日志文g的RollingQ我们用RollingFileAppenderQƈ且只保留20个日志文ӞQ但是Log4J日志文gRoll的日志出C断结Q在开始要Roll到真正完成Roll中间q有几行GemFire自n的日志,此时我ƈ没有觉得q个是有很大问题的,因ؓ我始l觉得Log4J除了它自己提到^均对性能?0%的媄响以外,它就是一个简单的把日志写到文件的q程Q不会媄响的整个应用E序本nQ因为它太简单了Q直C天这个问题再ơ出玎ͼ依然没有M其他斚w的收P所有的地方都显C正常状态,甚至我们之前发现的网卡问题今天也没有发生Q然而同h出问题的两分钟没有出现应用程序日志,日志文gRoll的日志和上次cMQ开始Roll到结束出现GemFire日志的交叉? <div><fieldset><legend>最q一ơ发生的日志</legend> <div>[info 2015/08/12 01:56:07.736 BST …] ClientHealthMonitor: Registering client with member id …</div> <div>log4j: rolling over count=20971801</div> <div>log4j: maxBackupIndex=20</div> <div>[info 2015/08/12 01:56:12.265 BST …] ClientHealthMonitor: Unregistering client with member id …</div> <div>……</div> <div>[info 2015/08/12 01:56:23.773 BST …] ClientHealthMonitor: Registering client with member id …</div> <div>log4j: Renaming file logs/….log.19 to logs/….log.20</div> </fieldset></div> <div><fieldset><legend>一周前发生的日?/legend> <div>[info 2015/08/04 01:43:45.761 BST …] ClientHealthMonitor: Registering client with member id …</div> <div>log4j: rolling over count=20971665</div> <div>log4j: maxBackupIndex=20</div> <div>……</div> <div>[info 2015/08/04 01:45:25.506 BST …] ClientHealthMonitor: Registering client with member id …</div> <div>log4j: Renaming file logs/….log.19 to logs/….log.20</div> </fieldset></div> <div>看似q个是一个规律(套用同事的一句话Q一ơ发生时偶然Q两ơ发生就是科学了Q。然而此时我其实依然不太怿Log4J?#8220;凶手”Q因为我一直觉得Log4J是一个简单的日志输出框架Q它要是出问题也只是它自q问题Q是局部的Q而这个问题的出现明显是全局的,直到我突然脑子一闪而过Q?strong>日志打印的操作是synchronizedQ也是说在日志文gRoll的时候,所有其它需要打日志的线E都要等待直到Roll完成Q如果这个Rollq程过?分钟Q那么就会发生我们看到的Stripe Lock时Q也是发生了程?#8220;假死”的状态?/strong>重新查看Log4J打印日志的方法调用栈Q它会在两个地方用synchronizedQ即同一个CategoryQLoggerQ类实例Q?br /> <div style="font-size: 13px; border: 1px solid #cccccc; padding: 4px 5px 4px 4px; width: 98%; word-break: break-all; background-color: #eeeeee;"><!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->    <span style="color: #0000FF; ">public</span> <span style="color: #0000FF; ">void</span> callAppenders(LoggingEvent event) {<br />         <span style="color: #0000FF; ">int</span> writes = 0;<br />         <span style="color: #0000FF; ">for</span>(Category c = <span style="color: #0000FF; ">this</span>; c != <span style="color: #0000FF; ">null</span>; c=c.parent) {<br />             <span style="color: #008000; ">//</span><span style="color: #008000;"> Protected against simultaneous call to addAppender, removeAppender,<img src="http://www.tkk7.com/Images/dot.gif" alt="" /></span><span style="color: #008000; "><br /> </span>            <span style="color: #0000FF; ">synchronized</span>(c) {<br />                 <span style="color: #0000FF; ">if</span>(c.aai != <span style="color: #0000FF; ">null</span>) {<br />                     writes += c.aai.appendLoopOnAppenders(event);<br />                 }<br />                 <span style="color: #0000FF; ">if</span>(!c.additive) {<br />                     <span style="color: #0000FF; ">break</span>;<br />                 }<br />             }<br />         }<br /> 。。?br />     }</div> </div>以及同一个Appender在doApppendӞ<br /><div style="font-size: 13px; border: 1px solid #cccccc; padding: 4px 5px 4px 4px; width: 98%; word-break: break-all; background-color: #eeeeee;"><!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->    <span style="color: #0000FF; ">public</span> <span style="color: #0000FF; ">synchronized</span> <span style="color: #0000FF; ">void</span> doAppend(LoggingEvent event) {<br />      。。?br />      <span style="color: #0000FF; ">this</span>.append(event);<br />    }</div><div>而Roll的过E就是在appendҎ中,q一步分析,在下面两句话之间Q他们分别花费了过100s和超q?1s的时_</div><div>log4j: maxBackupIndex=20<br />。。?/div><div>log4j: Renaming file logs/….log.19 to logs/….log.20</div><div>而这两句之间只包含了两个File.exists()Q一个File.delete()Q一个File.rename()操作Q?/div><div style="font-size: 13px; border: 1px solid #cccccc; padding: 4px 5px 4px 4px; width: 98%; word-break: break-all; background-color: #eeeeee;"><!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->    <span style="color: #0000FF; ">public</span> <span style="color: #0000FF; ">void</span> rollOver() {<br />      。。?br />      <span style="color: #0000FF; ">if</span>(maxBackupIndex > 0) {<br />        <span style="color: #008000; ">//</span><span style="color: #008000;"> Delete the oldest file, to keep Windows happy.</span><span style="color: #008000; "><br /></span>        file = <span style="color: #0000FF; ">new</span> File(fileName + '.' + maxBackupIndex);<br />        <span style="color: #0000FF; ">if</span> (file.exists())<br />            renameSucceeded = file.delete();<br />        <span style="color: #0000FF; ">for</span> (<span style="color: #0000FF; ">int</span> i = maxBackupIndex - 1; i >= 1 && renameSucceeded; i--) {<br />            file = <span style="color: #0000FF; ">new</span> File(fileName + "." + i);<br />            <span style="color: #0000FF; ">if</span> (file.exists()) {<br />                target = <span style="color: #0000FF; ">new</span> File(fileName + '.' + (i + 1));<br />                LogLog.debug("Renaming file " + file + " to " + target);<br />                renameSucceeded = file.renameTo(target);<br />            }<br />        }<br />      。。?br />      }<br />    }</div><div><h2>NFS单性能试和分?/h2>因而我对NFS的性能作了一些简单测试:</div><div>只有一个线E时Q在NFS下rename性能Q?/div><div>1 file:                    3ms</div><div>10 files:                48ms</div><div>20 files:                114ms</div><div>相比较,在本地磁盘rename的性能Q?/div><div>1 file:                    1ms</div><div>3 files:                  1ms</div><div>10 files:                3ms</div><div>对NFS和本地磁盘写的性能Q模拟日志,每行都会flushQ:</div><table border="0" cellspacing="0" cellpadding="0" style="border-collapse:collapse;"> <tbody><tr style="height:13.4pt"> <td width="139" valign="top" style="width: 1.45in; border: 1pt solid windowtext; padding: 0in 5.4pt; height: 13.4pt;"> <p> </p> </td> <td width="78" valign="top" style="width: 58.5pt; border-style: solid solid solid none; border-top-color: windowtext; border-right-color: windowtext; border-bottom-color: windowtext; border-top-width: 1pt; border-right-width: 1pt; border-bottom-width: 1pt; padding: 0in 5.4pt; height: 13.4pt;"> <p><span style="color:#1F497D">NFS</span></p> </td> <td width="78" valign="top" style="width: 58.5pt; border-style: solid solid solid none; border-top-color: windowtext; border-right-color: windowtext; border-bottom-color: windowtext; border-top-width: 1pt; border-right-width: 1pt; border-bottom-width: 1pt; padding: 0in 5.4pt; height: 13.4pt;"> <p><span style="color:#1F497D">LOCAL</span></p> </td> </tr> <tr> <td width="139" valign="top" style="width: 1.45in; border-style: none solid solid; border-right-color: windowtext; border-bottom-color: windowtext; border-left-color: windowtext; border-right-width: 1pt; border-bottom-width: 1pt; border-left-width: 1pt; padding: 0in 5.4pt;"> <p><span style="color:#1F497D">1 writer, 11M</span></p> </td> <td width="78" valign="top" style="width:58.5pt;border-top:none;border-left:none; border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt; padding:0in 5.4pt 0in 5.4pt"> <p><span style="color:#1F497D">443ms</span></p> </td> <td width="78" valign="top" style="width:58.5pt;border-top:none;border-left:none; border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt; padding:0in 5.4pt 0in 5.4pt"> <p><span style="color:#1F497D">238ms</span></p> </td> </tr> <tr> <td width="139" valign="top" style="width: 1.45in; border-style: none solid solid; border-right-color: windowtext; border-bottom-color: windowtext; border-left-color: windowtext; border-right-width: 1pt; border-bottom-width: 1pt; border-left-width: 1pt; padding: 0in 5.4pt;"> <p><span style="color:#1F497D">1 writer, 101M</span></p> </td> <td width="78" valign="top" style="width:58.5pt;border-top:none;border-left:none; border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt; padding:0in 5.4pt 0in 5.4pt"> <p><span style="color:#1F497D">2793ms</span></p> </td> <td width="78" valign="top" style="width:58.5pt;border-top:none;border-left:none; border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt; padding:0in 5.4pt 0in 5.4pt"> <p><span style="color:#1F497D">992ms</span></p> </td> </tr> <tr> <td width="139" valign="top" style="width: 1.45in; border-style: none solid solid; border-right-color: windowtext; border-bottom-color: windowtext; border-left-color: windowtext; border-right-width: 1pt; border-bottom-width: 1pt; border-left-width: 1pt; padding: 0in 5.4pt;"> <p><span style="color:#1F497D">10 writers, 11M</span></p> </td> <td width="78" valign="top" style="width:58.5pt;border-top:none;border-left:none; border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt; padding:0in 5.4pt 0in 5.4pt"> <p><span style="color:#1F497D">~4400ms</span></p> </td> <td width="78" valign="top" style="width:58.5pt;border-top:none;border-left:none; border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt; padding:0in 5.4pt 0in 5.4pt"> <p><span style="color:#1F497D">~950ms</span></p> </td> </tr> <tr> <td width="139" valign="top" style="width: 1.45in; border-style: none solid solid; border-right-color: windowtext; border-bottom-color: windowtext; border-left-color: windowtext; border-right-width: 1pt; border-bottom-width: 1pt; border-left-width: 1pt; padding: 0in 5.4pt;"> <p><span style="color:#1F497D">10 writers, 101M</span></p> </td> <td width="78" valign="top" style="width:58.5pt;border-top:none;border-left:none; border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt; padding:0in 5.4pt 0in 5.4pt"> <p><span style="color:#1F497D">~30157ms</span></p> </td> <td width="78" valign="top" style="width:58.5pt;border-top:none;border-left:none; border-bottom:solid windowtext 1.0pt;border-right:solid windowtext 1.0pt; padding:0in 5.4pt 0in 5.4pt"> <p><span style="color:#1F497D">~5500ms</span></p> </td> </tr></tbody></table><div><br />一些其他的l计Q?/div><div><strong>100同时?</strong></div><div>Create 20 files spend: 301ms</div><div>Renaming 20 files spends: 333ms</div><div>Delete 20 files spends: 329ms</div><div></div><div><strong>1000同时?</strong></div><div>Create 20 files spend: 40145ms</div><div>Renaming 20 files spends: 39273ms<br /></div><div><strong>而在1000个同时写的过E中Q重命名Q?/strong></div><div>Rename file: LogTest1.50 take: <strong>36434ms</strong></div><div>Rename file: LogTest1.51 take: <strong>39ms</strong></div><div>Rename file: LogTest1.52 take: <strong>34ms</strong><br /></div><div>也就是说在这个模拟过E中Q一个文件的rename过36sQ而向我们有十几台机器同时使用相同的NFSQƈ且每台机器上都跑二三十个E序Q如果那D|间同时有上万个的日志写,可以预计辑ֈ100s情况是可能发生的?/div><div>关于NFS性能的问题,在《构建高性能WEB站点》的书(330)中也有涉及。简单的介绍QNFS由Sun?984q开发,是主异构^台实现文件共享的首选方案。它q没有自q传输协议Q而是使用RPCQRemote Procedure CallQ协议(应用层)QRPC协议默认底层ZUDP传输Q但是自己实现在丢包时的重传机制Q而且NFS服务器采用多q程模型Q默认进Eؓ4Q但是一般都会调优增加服务进E数Q然?#8220;不管怎么对NFSq行性能优化QNFS注定不适合作ؓI/O密集型文件共享方案,但可以作Z般用途,比如提供站点内部的资源共享,它的优势在于Ҏ搭徏Q而且可以减少不必要的数据冗余?#8221;</div><div>可以使用命oQ?#8220;nfsstat -c”获取对NFS服务器的操作的简单统计,具体可以参考《构建高性能WEB站点》的相关章节Q里面还有更详细的对NFS服务器性能的测试?/div><div><br /><h2>ȝ</h2><strong>从这个事件我ȝ了两件事情:</strong></div><div>1. 日志的媄响可能是全局性的Q因而要非常心Q一个耗时的操作可能引L序的“假死”Q因而要非常心?/div><div>2. 虽然把日志打印在NFS上,对大量的日志文g查找会方便很多,但是q是一个很耗性能的设计,特别是当大量的程序共享这个NFS的时候,因而要量避免?/div><img src ="http://www.tkk7.com/DLevin/aggbug/426751.html" width = "1" height = "1" /><br><br><div align=right><a style="text-decoration:none;" href="http://www.tkk7.com/DLevin/" target="_blank">DLevin</a> 2015-08-13 16:28 <a href="http://www.tkk7.com/DLevin/archive/2015/08/13/426751.html#Feedback" target="_blank" style="text-decoration:none;">发表评论</a></div>]]></description></item><item><title>实现自己的Lock对象http://www.tkk7.com/DLevin/archive/2015/08/11/426723.htmlDLevinDLevinMon, 10 Aug 2015 22:08:00 GMThttp://www.tkk7.com/DLevin/archive/2015/08/11/426723.htmlhttp://www.tkk7.com/DLevin/comments/426723.htmlhttp://www.tkk7.com/DLevin/archive/2015/08/11/426723.html#Feedback0http://www.tkk7.com/DLevin/comments/commentRss/426723.htmlhttp://www.tkk7.com/DLevin/services/trackbacks/426723.html一直想好好学习concurrent包中的各个类的实玎ͼ然而经常看了一点就因ؓ其他事情q扰而放下了。发现这样太不利于自q成长了,因而最q打潜心一件一件的完成自己惛_习的东西?br />
对concurrent包的学习打算先从Lock的实现开始,因而自然而然的就端v了AbstractQueuedSynchronizerQ然而要Lq个cȝ源码q不是那么容易,因而我开始问自己一个问题:如果自己要去实现q个一个Lock对象Q应该如何实现呢Q?br />
要实现Lock对象Q首先理解什么是锁?我自׃~程角度单的理解Q所谓锁对象Q互斥锁Q就是它能保证一ơ只有一个线E能q入它保护的临界区,如果有一个线E已l拿到锁对象Q那么其他对象必让权等待,而在该线E退个界区旉要唤醒等待列表中的其他线E。更学术一些,《计机操作pȝ?/a>中对同步机制准则的归UIP50Q:

  1. I闲让进。当无进E处于界区Ӟ表明临界资源处于I闲状态,应允怸个请求进入界区的进E立卌入自q临界区,以有效的利用临界资源?/li>
  2. 忙则{待。当已有q程q入临界区时Q表明界资源正在被讉KQ因而其他试图进入界区的进E必ȝ待,以保证对临界源的互斥讉K?/li>
  3. 有限{待。对要求讉K临界资源的进E,应保证在有限旉内能q入自己的界区Q以免陷?#8220;ȝ”状态?/li>
  4. 让权{待。当q程不能q入自己的界区Ӟ应该释放处理机,以免q程陷入“忙等”状态?/li>

说了那么多,其实对互斥锁很简单,只需要一个标CQ如果该标记位ؓ0Q表C没有被占用Q因而直接获得锁Q然后把该标C|ؓ1Q此时其他线E发现该标记位已l是1Q因而需要等待。这里对q个标记位的比较q设值必L原子操作Q而在JDK5以后提供的atomic包里的工L可以很方便的提供q个原子操作。然而上面的四个准则应该漏了一点,即释N的线E(q程Q和得到锁的U程Q进E)应该是同一个,像一把钥匙对应一把锁Q理想的Q,所以一个非常简单的Lockcd以这么实玎ͼ

public class SpinLockV1 {
    
private final AtomicInteger state = new AtomicInteger(0);
    
private volatile Thread owner; // q里owner字段可能存在中间|不可靠,因而其他线E不可以依赖q个字段的?/span>
    
    
public void lock() {
        
while (!state.compareAndSet(01)) { }
        owner 
= Thread.currentThread();
    }
    
    
public void unlock() {
        Thread currentThread 
= Thread.currentThread();
        
if (owner != currentThread || !state.compareAndSet(10)) {
            
throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
        }
        owner 
= null;
    }
}

一个简单的试ҎQ?br />

    @Test
    
public void testLockCorrectly() throws InterruptedException {
        
final int COUNT = 100;
        Thread[] threads 
= new Thread[COUNT];
        SpinLockV1 lock 
= new SpinLockV1();
        AddRunner runner 
= new AddRunner(lock);
        
for (int i = 0; i < COUNT; i++) { 
            threads[i] 
= new Thread(runner, "thread-" + i);
            threads[i].start();
        }
        
        
for (int i = 0; i < COUNT; i++) {
            threads[i].join();
        }
        
        assertEquals(COUNT, runner.getState());
    }
    
    
private static class AddRunner implements Runnable {
        
private final SpinLockV1 lock;
        
private int state = 0;

        
public AddRunner(SpinLockV1 lock) {
            
this.lock = lock;
        }
        
        
public void run() {
            lock.lock();
            
try {
                quietSleep(
10);
                state
++;
                System.out.println(Thread.currentThread().getName() 
+ "" + state);
            } 
finally {
                lock.unlock();
            }
        }
        
        
public int getState() {
            
return state;
        }
    }

然而这个SpinLock其实q不需要stateq个字段Q因为owner的赋g否也是一U状态,因而可以用它作ZU互斥状态:

public class SpinLockV2 {
    
private final AtomicReference<Thread> owner = new AtomicReference<Thread>(null);
    
    
public void lock() {
        
final Thread currentThread = Thread.currentThread();
        
while (!owner.compareAndSet(null, currentThread)) { }
    }
    
    
public void unlock() {
        Thread currentThread 
= Thread.currentThread();
        
if (!owner.compareAndSet(currentThread, null)) {
            
throw new IllegalStateException("The lock is not owned by thread: " + currentThread);
        }
    }
}

q在操作pȝ中被定义为整形信号量Q然而整形信号量如果没拿到锁会一直处?#8220;忙等”状态(没有遵@有限{待和让权等待的准则Q,因而这U锁也叫Spin LockQ在短暂的等待中它可以提升性能Q因为可以减线E的切换Qconcurrent包中的Atomic大部分都采用q种机制实现Q然而如果需要长旉的等待,“忙等”会占用不必要的CPU旉Q从而性能会变的很差,q个时候就需要将没有拿到锁的U程攑ֈ{待列表中,q种方式在操作系l中也叫记录型信号量Q它遵@了让权等待准则(当前没有实现有限{待准则Q。在JDK6以后提供了LockSupport.park()/LockSupport.unpark()操作Q可以将当前U程攑օ一个等待列表或一个线E从q个{待列表中唤醒。然而这个park/unpark的等待列表是一个全局的等待列表,在unpartk的时候还是需要提供需要唤醒的Thread对象Q因而我们需要维护自q{待列表Q但是如果我们可以用JDK提供的工LConcurrentLinkedQueueQ就非常Ҏ实现Q如LockSupport文中给出来?a >代码事例Q?br />

class FIFOMutex {
   
private final AtomicBoolean locked = new AtomicBoolean(false);
   
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

   
public void lock() {
     
boolean wasInterrupted = false;
     Thread current 
= Thread.currentThread();
     waiters.add(current);

     
// Block while not first in queue or cannot acquire lock
     while (waiters.peek() != current || !locked.compareAndSet(falsetrue)) {
        LockSupport.park(
this);
        
if (Thread.interrupted()) // ignore interrupts while waiting
          wasInterrupted = true;
     }

     waiters.remove();
     
if (wasInterrupted)          // reassert interrupt status on exit
        current.interrupt();
   }

   
public void unlock() {
     locked.set(
false);
     LockSupport.unpark(waiters.peek());
   }
 }

在该代码事例中,有一个线E等待队列和锁标记字D,每次调用lock时先当前线E放入这个等待队列中Q然后拿出队列头U程对象Q如果该U程对象正好是当前线E,q且成功 使用CAS方式讄locked字段Q这里需要两个同时满I因ؓ可能出现一个线E已l从队列中移除了但还没有unlockQ此时另一个线E调用lockҎQ此旉列头的线E就是第二个U程Q然而由于第一个线E还没有unlock或者正在unlockQ因而需要用CAS原子操作来判断是否要parkQ,表示该线E竞争成功,获得锁,否则当前线EparkQ这里之所以要攑֜ while循环中,因ؓpark操作可能无理p?spuriously)Q如文档中给出的描述Q?br />

LockSupport.park()
public static void park(Object blocker)
Disables the current thread for thread scheduling purposes unless the permit is available.

If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

  • Some other thread invokes unpark with the current thread as the target; or
  • Some other thread interrupts the current thread; or
  • The call spuriously (that is, for no reason) returns.

This method does not report which of these caused the method to return. Callers should re-check the conditions which caused the thread to park in the first place. Callers may also determine, for example, the interrupt status of the thread upon return.

Parameters:
blocker - the synchronization object responsible for this thread parking
Since:
1.6
我在实现自己的类时就被这?#8220;无理p?#8221;坑了好久。对于已l获得锁的线E,该U程从等待队列中U除Q这里由于ConcurrentLinkedQueue是线E安全的Q因而能保证每次都是队列头的U程得到锁,因而在得到锁匙队列头U除。unlock逻辑比较单,只需要将locked字段打开Q设|ؓfalseQ,唤醒QunparkQ队列头的线E即可,然后该线E会l箋在lockҎ的while循环中l竞争unlocked字段Qƈ它自己从线E队列中U除表示获得锁成功。当然安全v见,最好在unlock中加入一些验证逻辑Q如解锁的线E和加锁的线E需要相同?br />
然而本文的目的是自己实C个Lock对象Q即只用一些基本的操作Q而不使用JDK提供的AtomiccdConcurrentLinkedQueue。类似的首先我们也需要一个队列存攄待线E队列(公^赯Q用先q先出队列)Q因而先定义一个Node对象用以构成q个队列Q?br />

 

    protected static class Node {
        
volatile Thread owner;
        
volatile Node prev;
        
volatile Node next;
        
        
public Node(Thread owner) {
            
this.owner = owner;
            
this.state = INIT;
        }
        
        
public Node() {
            
this(Thread.currentThread());
        }
    }

单v见,队列头是一个v点的placeholderQ每个调用lock的线E都先将自己竞争攑օq个队列,每个队列头后一个线E(NodeQ即是获得锁的线E,所以我们需要有head Node字段用以快速获取队列头的后一个NodeQ而tail Node字段用来快速插入新的NodeQ所以关键在于如何线E安全的构徏q个队列Q方法还是一LQ用CAS操作Q即CASҎ自p|成tail|然后重新构徏q个列表Q?br />

    protected boolean enqueue(Node node) {
        
while (true) {
            
final Node preTail = tail;
            node.prev 
= preTail;
            
if (compareAndSetTail(preTail, node)) {
                preTail.next 
= node;
                
return node.prev == head;
            }
        }
    }

在当前线ENode以线E安全的方式攑օq个队列后,lock实现相对比较简单了Q如果当前Node是的前驱是headQ该U程获得锁,否则park当前U程Q处理park无理p回的问题Q因而将park攑օwhile循环中(该实现是一个不可重入的实现Q:

    public void lock() {
        
// Put the latest node to a queue first, then check if the it is the first node
        
// this way, the list is the only shared resource to deal with
        Node node = new Node();
        
if (enqueue(node)) {
            current 
= node.owner;
        } 
else {
            
while (node.prev != head) {
                LockSupport.park(
this); // This may return "spuriously"!!, so put it to while
            }

            current 
= node.owner;
        }
    }

unlock的实现需要考虑多种情况Q如果当前Node(head.next)有后驱,那么直接unpark该后驱即可;如果没有Q表C当前已l没有其他线E在{待队列中,然而在q个判断q程中可能会有其他线E进入,因而需要用CAS的方式设|tailQ如果设|失败,表示此时有其他线E进入,因而需要将该新q入的线Eunpark从而该新进入的U程在调用park后可以立卌回(q里的CAS和enqueue的CAS都是对tail操作Q因而能保证状态一_Q?br />

    public void unlock() {
        Node curNode 
= unlockValidate();
        Node next 
= curNode.next;
        
if (next != null) {
           
head.next = next;
            next.prev 
= head;
            LockSupport.unpark(next.owner);
        } 
else {
            
if (!compareAndSetTail(curNode, head)) {
               
while (curNode.next == null) { } // Wait until the next available
                // Another node queued during the time, so we have to unlock that, or else, this node can never unparked
                unlock();
            } 
else {
               
compareAndSetNext(head, curNode, null); // Still use CAS here as the head.next may already been changed
            }
        }
    }

具体的代码和试cd以参考查?a >q里?br />


其实直到自己写完q个cd才直到者其实这是一个MCS锁的变种Q因而这个实现每个线Epark在自w对应的node上,而由前一个线Eunpark它;而AbstractQueuedSynchronizer是CLH锁,因ؓ它的park由前q态决定,虽然它也是由前一个线Eunpark它。具体可以参?a >q里?/p>

DLevin 2015-08-11 06:08 发表评论
]]>
վ֩ģ壺 һëƬֱ| պƷרӰ| Ļ| 18| պëƬѿ| fc2˳| ѿav뾫Ʒɫҹ| ƬaëƬ| aëƬվ| Ůһ18| ˳˳ۺ| þ޾Ʒۿ| ҳվ߹ۿ| ŮҼxx00Ƶ| þպƬ| ޳꿴Ƭ߹ۿ| ĻĻmv| 91ѸƵ| ɫavƷר| ޾Ʒ߹ۿ| ޳avƬ| պ߹ۿ| ߹ۿƵ| 91ѹƷ| ޷츾| ɫػaëƬѹۿ| һ߲߲| ޳AVƬþ| ˳վ߹ۿ| ûվɫƵѹۿ| 97avƵ| 99Ƶ| 0588ӰֻѿƬ| 㻨߹ۿѹۿͼƬ| AVרAVԾ| avպۺһ߹ۿ| ѹaƬ| ҹƷѹۿ| պaëƬa| ʮ˽վ| ˸徫Ʒѹۿ|