<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    qileilove

    blog已經轉移至github,大家請訪問 http://qaseven.github.io/

    談disruptor的單線程數據庫操作

    對遠程數據庫的操作,采用disruptor能夠很好解決死鎖,
      首先是定義一個抽象類,實現Runnable接口
    public abstract class  Task implements Runnable  {
    public Task(){}
    }
    public class TaskEvent {
    private Task tk;
    public Task getTask() {
    return tk;
    }
    public void setTask(Task tk) {
    this.tk = tk;
    }
    public final static EventFactory<TaskEvent> EVENT_FACTORY = new EventFactory<TaskEvent>() {
    public TaskEvent newInstance() {
    return new TaskEvent();
    }
    };
    public class TaskEventHandler implements EventHandler<TaskEvent> {
    //  執行接口函數onEvent執行
    public void onEvent(TaskEvent event, long sequence,
    boolean endOfBatch) throws Exception {
    event.getTask().run();
    }
    }
    }
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import com.zhenhai.bonecp.CustomThreadFactory;
    import com.zhenhai.disruptor.BatchEventProcessor;
    import com.zhenhai.disruptor.RingBuffer;
    import com.zhenhai.disruptor.SequenceBarrier;
    import com.zhenhai.disruptor.YieldingWaitStrategy;
    import com.zhenhai.disruptor.dsl.ProducerType;
    /**
    *     使用方法
    DisruptorHelper.initAndStart();
    Task tt=new Taska();
    DisruptorHelper.produce(tt);
    DisruptorHelper.shutdown();
    *
    *
    */
    public class DisruptorHelper {
    /**
    * ringbuffer容量,最好是2的N次方
    */
    private static final int BUFFER_SIZE = 1024 * 1;
    private static int group=2;
    private RingBuffer<TaskEvent> ringBuffer[];
    private SequenceBarrier sequenceBarrier[];
    private TaskEventHandler handler[];
    private BatchEventProcessor<TaskEvent> batchEventProcessor[];
    private  static DisruptorHelper instance;
    private static boolean inited = false;
    private static ScheduledExecutorService taskTimer=null;
    //JDK 創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。
    private    ExecutorService execute[];
    //啟動監視線程
    static {
    System.out.println("init DisruptorHelper!!!!!!!!!!!!!!!!!");
    instance = new DisruptorHelper();
    instance.init();
    inited = true;
    System.out.println("init DisruptorHelper end!!!!!!!!!!!!!!!!!");
    }
    **
    * 靜態類
    * @return
    */
    private DisruptorHelper(){ }
    /**
    * 初始化
    */
    private void init(){
    execute=new ExecutorService[group];
    ringBuffer=new RingBuffer[group];
    sequenceBarrier=new SequenceBarrier[group];
    handler=new TaskEventHandler[group];
    batchEventProcessor=new BatchEventProcessor[group];
    ////////////////定時執行////////////////
    //初始化ringbuffer,存放Event
    for(int i=0;i<group;i++){
    ringBuffer[i] = RingBuffer.create(ProducerType.SINGLE, TaskEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
    sequenceBarrier[i] = ringBuffer[i].newBarrier();
    handler[i] = new TaskEventHandler();
    batchEventProcessor[i] = new BatchEventProcessor<TaskEvent>(ringBuffer[i], sequenceBarrier[i], handler[i]);
    ringBuffer[i].addGatingSequences(batchEventProcessor[i].getSequence());
    execute[i]= Executors.newSingleThreadExecutor();
    execute[i].submit(instance.batchEventProcessor[i]);
    }
    this.taskTimer =  Executors.newScheduledThreadPool(10, new CustomThreadFactory("DisruptorHelper-scheduler", true));
    inited = true;
    }
    /**
    * 執行定時器
    * @param tk
    */
    private void produce(int index,Task tk){
    //System.out.println("index:="+index);
    if(index<0||index>=group) {
    System.out.println("out of group index:="+index);
    return;
    }
    // if capacity less than 10%, don't use ringbuffer anymore
    System.out.println("capacity:="+ringBuffer[index].remainingCapacity());
    if(ringBuffer[index].remainingCapacity() < BUFFER_SIZE * 0.1) {
    System.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");
    // do something
    }else {
    long sequence = ringBuffer[index].next();
    //將狀態報告存入ringBuffer的該序列號中
    ringBuffer[index].get(sequence).setTask(tk);
    //通知消費者該資源可以消費
    ringBuffer[index].publish(sequence);
    }
    }
    /**
    * 獲得容器的capacity的數量
    * @param index
    * @return
    */
    private long  remainingcapacity(int index){
    //System.out.println("index:="+index);
    if(index<0||index>=group) {
    System.out.println("out of group index:="+index);
    return 0L;
    }
    long capacity= ringBuffer[index].remainingCapacity();
    return capacity;
    }
    private void shutdown0(){
    for(int i=0;i<group;i++){
    execute[i].shutdown();
    }
    }
    ////////////////////////////////下面是靜態方法提供調用////////////////////////////////////////////////////////
    /**
    * 直接消費
    * @param tk
    */
    public static void addTask(int priority,Task tk){
    instance.produce(priority,tk);
    }
    /**
    * 定時消費
    * @param tk
    * @param delay
    * @param period
    */
    public static void scheduleTask(int priority,Task tk,long delay,long period){
    Runnable timerTask = new ScheduledTask(priority, tk);
    taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
    }
    /**
    * 定點執行
    * @param tk
    * @param hourse
    * @param minus
    * @param sec
    * @return
    */
    public static Runnable scheduleTask(int priority,Task tk, int hourse,int minus,int sec)
    {
    Runnable timerTask = new ScheduledTask(priority, tk);
    //每天2:30分執行
    long delay = Helper.calcDelay(hourse,minus,sec);
    long period = Helper.ONE_DAY;
    System.out.println("delay:"+(delay/1000)+"secs");
    taskTimer.scheduleAtFixedRate(timerTask, delay, period, TimeUnit.MILLISECONDS);
    return timerTask;
    }
    //對定時執行的程序進行分裝
    private static class ScheduledTask implements Runnable
    {
    private int priority;
    private Task task;
    ScheduledTask(int priority, Task task)
    {
    this.priority = priority;
    this.task = task;
    }
    public void run()
    {
    try{
    instance.produce(priority,task);
    }catch(Exception e){
    System.out.println("catch exception in DisruptorHelper!");
    }
    }
    }
    public static long getRemainingCapatiye(int index){
    return instance.getRemainingCapatiye(index);
    }
    public static void shutdown(){
    if(!inited){
    throw new RuntimeException("Disruptor還沒有初始化!");
    }
    instance.shutdown0();
    }
    }

    posted on 2014-05-15 11:53 順其自然EVO 閱讀(803) 評論(0)  編輯  收藏 所屬分類: 測試學習專欄

    <2014年5月>
    27282930123
    45678910
    11121314151617
    18192021222324
    25262728293031
    1234567

    導航

    統計

    常用鏈接

    留言簿(55)

    隨筆分類

    隨筆檔案

    文章分類

    文章檔案

    搜索

    最新評論

    閱讀排行榜

    評論排行榜

    主站蜘蛛池模板: 国内免费高清在线观看| 亚洲一区二区三区乱码在线欧洲| 成人免费a级毛片无码网站入口 | 一级毛片无遮挡免费全部| 亚洲国产精品综合久久网各| 亚洲精品高清无码视频| 免费中文字幕在线| 成人免费一区二区三区在线观看| 69视频免费在线观看| a级毛片免费全部播放| 日韩一区二区三区免费播放| 亚洲精品无码永久在线观看男男| 亚洲毛片免费视频| 亚洲国产一区在线| 亚洲av无码成h人动漫无遮挡 | 污视频网站在线观看免费| 亚洲日韩国产精品乱-久| 亚洲最大免费视频网| 久久久久亚洲av无码专区 | 777爽死你无码免费看一二区| 精品成人免费自拍视频| 中文字幕视频免费在线观看| a级毛片免费高清视频| 一级毛片大全免费播放下载 | 最近最好的中文字幕2019免费| 0588影视手机免费看片| 18级成人毛片免费观看| 在线人成精品免费视频| 热re99久久6国产精品免费| 无码人妻一区二区三区免费看| 欧洲人免费视频网站在线| 国产偷伦视频免费观看| 国产精品99久久免费观看| 久久久久久久久久国产精品免费| 男女作爱在线播放免费网站| 久9久9精品免费观看| 久久久久免费看成人影片| 日本免费一区二区在线观看| 真人做A免费观看| 我要看免费的毛片| 日本免费一区二区三区最新vr|