【pipeline】【分布式的id生成器】【分布式鎖【watch】【multi】】【redis分布式】
好了,一個(gè)一個(gè)來(lái)。
一、 Pipeline
官方的說(shuō)明是:starts a pipeline,which is a very efficient way to send lots of command and read all the responses when you finish sending them。簡(jiǎn)單點(diǎn)說(shuō)pipeline適用于批處理。當(dāng)有大量的操作需要一次性執(zhí)行的時(shí)候,可以用管道。
示例:
Jedis jedis =
new Jedis(String,
int);
Pipeline p = jedis.pipelined();
p.set(key,value);
//每個(gè)操作都發(fā)送請(qǐng)求給redis-server
p.get(key,value);

p.sync();
//這段代碼獲取所有的response
這里我進(jìn)行了20w次連續(xù)操作(10w讀,10w寫(xiě)),不用pipeline耗時(shí):187242ms,用pipeline耗時(shí):1188ms,可見(jiàn)使用管道后的性能上了一個(gè)臺(tái)階。看了代碼了解到,管道通過(guò)一次性寫(xiě)入請(qǐng)求,然后一次性讀取響應(yīng)。也就是說(shuō)jedis是:request response,request response,...;pipeline則是:request request... response response的方式。這樣無(wú)需每次請(qǐng)求都等待server端的響應(yīng)。
二、 跨jvm的id生成器
談到這個(gè)話(huà)題,首先要知道redis-server端是單線(xiàn)程來(lái)處理client端的請(qǐng)求的。
這樣來(lái)實(shí)現(xiàn)一個(gè)id生成器就非常簡(jiǎn)單了,只要簡(jiǎn)單的調(diào)用jdeis.incr(key);就搞定了。
你或許會(huì)問(wèn),incr是原子操作嗎,能保證不會(huì)出現(xiàn)并發(fā)問(wèn)題嗎,前面說(shuō)過(guò),server端是單線(xiàn)程處理請(qǐng)求的。
三、 【跨jvm的鎖實(shí)現(xiàn)【watch】【multi】】
首先說(shuō)下這個(gè)問(wèn)題的使用場(chǎng)景,有些時(shí)候我們業(yè)務(wù)邏輯是在不同的jvm進(jìn)程甚至是不同的物理機(jī)上的jvm處理的。這樣如何來(lái)實(shí)現(xiàn)不同jvm上的同步問(wèn)題呢,其實(shí)我們可以基于redis來(lái)實(shí)現(xiàn)一個(gè)鎖。
具體事務(wù)和監(jiān)聽(tīng)請(qǐng)參考文章:
redis學(xué)習(xí)筆記之事務(wù) 暫時(shí)找到三種實(shí)現(xiàn)方式:
1. 通過(guò)jedis.setnx(key,value)實(shí)現(xiàn)
import java.util.Random;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
/**
* @author Teaey
*/
public class RedisLock {
//加鎖標(biāo)志
public static final String LOCKED = "TRUE";
public static final long ONE_MILLI_NANOS = 1000000L;
//默認(rèn)超時(shí)時(shí)間(毫秒)
public static final long DEFAULT_TIME_OUT = 3000;
public static JedisPool pool;
public static final Random r = new Random();
//鎖的超時(shí)時(shí)間(秒),過(guò)期刪除
public static final int EXPIRE = 5 * 60;
static {
pool = new JedisPool(new Config(), "host", 6379);
}
private Jedis jedis;
private String key;
//鎖狀態(tài)標(biāo)志
private boolean locked = false;
public RedisLock(String key) {
this.key = key;
this.jedis = pool.getResource();
}
public boolean lock(long timeout) {
long nano = System.nanoTime();
timeout *= ONE_MILLI_NANOS;
try {
while ((System.nanoTime() - nano) < timeout) {
if (jedis.setnx(key, LOCKED) == 1) {
jedis.expire(key, EXPIRE);
locked = true;
return locked;
}
// 短暫休眠,nano避免出現(xiàn)活鎖
Thread.sleep(3, r.nextInt(500));
}
} catch (Exception e) {
}
return false;
}
public boolean lock() {
return lock(DEFAULT_TIME_OUT);
}
// 無(wú)論是否加鎖成功,必須調(diào)用
public void unlock() {
try {
if (locked)
jedis.del(key);
} finally {
pool.returnResource(jedis);
}
}
}
2. 通過(guò)事務(wù)(multi)實(shí)現(xiàn)
由于采納第一張方法,第二種跟第三種實(shí)現(xiàn)只貼了關(guān)鍵代碼,望諒解。^_^
public boolean lock_2(long timeout) { long nano = System.nanoTime();
timeout *= ONE_MILLI_NANOS;
try {
while ((System.nanoTime() - nano) < timeout) {
Transaction t = jedis.multi();
// 開(kāi)啟事務(wù),當(dāng)server端收到multi指令
// 會(huì)將該client的命令放入一個(gè)隊(duì)列,然后依次執(zhí)行,知道收到exec指令
t.getSet(key, LOCKED);
t.expire(key, EXPIRE);
String ret = (String) t.exec().get(0);
if (ret == null || ret.equals("UNLOCK")) {
return true;
}
// 短暫休眠,nano避免出現(xiàn)活鎖
Thread.sleep(3, r.nextInt(500));
}
} catch (Exception e) {
}
return false;
}
3. 通過(guò)事務(wù)+監(jiān)聽(tīng)實(shí)現(xiàn)
public boolean lock_3(long timeout) { long nano = System.nanoTime();
timeout *= ONE_MILLI_NANOS;
try {
while ((System.nanoTime() - nano) < timeout) {
jedis.watch(key);
// 開(kāi)啟watch之后,如果key的值被修改,則事務(wù)失敗,exec方法返回null
String value = jedis.get(key);
if (value == null || value.equals("UNLOCK")) {
Transaction t = jedis.multi();
t.setex(key, EXPIRE, LOCKED);
if (t.exec() != null) {
return true;
}
}
jedis.unwatch();
// 短暫休眠,nano避免出現(xiàn)活鎖
Thread.sleep(3, r.nextInt(500));
}
} catch (Exception e) {
}
return false;
}
最終采用第一種實(shí)現(xiàn),因?yàn)榧渔i只需發(fā)送一個(gè)請(qǐng)求,效率最高。
四、 【redis分布式】
最后一個(gè)話(huà)題,jedis的分布式。在jedis的源碼里發(fā)現(xiàn)了兩種hash算法(MD5,MURMUR Hash(默認(rèn))),也可以自己實(shí)現(xiàn)redis.clients.util.Hashing接口擴(kuò)展。
List<JedisShardInfo> hosts = new ArrayList<JedisShardInfo>(); //server1
JedisShardInfo host1 = new JedisShardInfo("", 6380, 2000);
//server2
JedisShardInfo host2 = new JedisShardInfo("", 6381, 2000);
hosts.add(host1);
hosts.add(host2);
ShardedJedis jedis = new ShardedJedis(hosts);
jedis.set("key", "");
另外寫(xiě)博客真費(fèi)力。。。