<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    SPRING INTEGRATION - 集群選主、分布式鎖

    集群通常是有多個相同的實例,但對于定時任務場景,只希望有一個實例工作即可,如果這個實例掛了,其他實例可以頂替。

    這個問題的方案則是集群選主,一個集群中,只有一個LEADER,由LEADER負責執行定時任務工作。當LEADER被取消時,會在剩下的實例中再選LEADER。

    持有分布式鎖的實例則是LEADER。

    SPRING INTEGRATION JDBC 則已提供相關功能。

    pom.xml
            <dependency>
               <groupId>org.springframework.integration</groupId>
               <artifactId>spring-integration-jdbc</artifactId>
            </dependency>

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>

            <dependency>
               <groupId>org.flywaydb</groupId>
               <artifactId>flyway-core</artifactId>
            </dependency>
            
            <dependency>
                <groupId>org.mariadb.jdbc</groupId>
                <artifactId>mariadb-java-client</artifactId>
            </dependency>

    LeaderElectionIntegrationConfig.java
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;

    import javax.sql.DataSource;

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.jdbc.lock.DefaultLockRepository;
    import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
    import org.springframework.integration.jdbc.lock.LockRepository;
    import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;

    import com.paul.integration.leader.ControlBusGateway;
    import com.paul.integration.leader.MyCandidate;

    @Configuration
    public class LeaderElectionIntegrationConfig {
        
        @Bean
        public List<String> needToStartupAdapterList(){
            return new CopyOnWriteArrayList<>();
        }
        
        @Bean
        public DefaultLockRepository defaultLockRepository(DataSource dataSource){
            DefaultLockRepository defaultLockRepository =
                    new DefaultLockRepository(dataSource);
    //        defaultLockRepository.setTimeToLive(60_000);
            return defaultLockRepository;
        }

        @Bean
        public JdbcLockRegistry jdbcLockRegistry(LockRepository lockRepository){
            return new JdbcLockRegistry(lockRepository);
        }
        
        @Bean
        public MyCandidate myCandidate(
            ControlBusGateway controlBusGateway,
            List<String> needToStartupAdapterList
        ) {
            return new MyCandidate(controlBusGateway, needToStartupAdapterList);
        }
        
        @Bean
        public LockRegistryLeaderInitiator leaderInitiator() {
            return new LockRegistryLeaderInitiator(
                        jdbcLockRegistry(null), myCandidate(nullnull)
                   );
        }
        
        
    }


    MyCandidate.java
    import java.util.List;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.integration.leader.Context;
    import org.springframework.integration.leader.DefaultCandidate;

    import com.novacredit.mcra.mcracommon.integration.gateway.ControlBusGateway;

    public class MyCandidate extends DefaultCandidate{
        
        private static final Logger LOG = LoggerFactory.getLogger(MyCandidate.class);
        
        private List<String> needToStartupAdapterList;
        
        private ControlBusGateway controlBusGateway;
        
        public MyCandidate(
            ControlBusGateway controlBusGateway,
            List<String> needToStartupAdapterList
        ) {
            this.controlBusGateway = controlBusGateway;
            this.needToStartupAdapterList = needToStartupAdapterList;
        }
        
        @Override
        public void onGranted(Context context) {
            super.onGranted(context);
            LOG.info("*** Leadership granted ***");
            LOG.info("STARTING MONGODB POLLER");
            needToStartupAdapterList
                .forEach(
                    c -> {
    //                    c = "@'testIntegrationFlow.org.springframework.integration.config."
    //                            + "SourcePollingChannelAdapterFactoryBean#0'";
                        String command = c + ".start()";
                        LOG.info("-----{}", command);
                        controlBusGateway.sendCommand(command);
                    }
                 );
            LOG.info("STARTUP MESSAGE SENT");

        }

        @Override
        public void onRevoked(Context context) {
            super.onRevoked(context);
            LOG.info("*** Leadership revoked ***");
            LOG.info("STOPPING MONGODB POLLER");
            needToStartupAdapterList
                .forEach(
                    c -> {
    //                    c = "@'testIntegrationConfig.testIntegrationFlow."
    //                            + "mongoMessageSource.inboundChannelAdapter'";
                        String command = c + ".stop()";
                        LOG.info("-----{}", command);
    //                    controlBusGateway.sendCommand(command);
                    }
                 );
            LOG.info("SHUTDOWN MESSAGE SENT");
        }

    }


    ControlBusIntegrationConfig.java
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.dsl.MessageChannels;
    import org.springframework.integration.gateway.GatewayProxyFactoryBean;
    import org.springframework.integration.handler.LoggingHandler;
    import org.springframework.messaging.MessageChannel;

    import com.paul.integration.gateway.ControlBusGateway;

    @Configuration
    public class ControlBusIntegrationConfig {
        
        @Bean
        public MessageChannel controlBusChannel() {
            return MessageChannels.direct().get();
        }
        
        @Bean
        public IntegrationFlow controlBusFlow() {
            return IntegrationFlows.from(controlBusChannel())
                        .log(LoggingHandler.Level.INFO, "controlBusChannel")
                        .controlBus()
                        .get();
        }
        
        @Bean
        public GatewayProxyFactoryBean controlBusGateway() {
            GatewayProxyFactoryBean gateway = new GatewayProxyFactoryBean(ControlBusGateway.class);
            gateway.setDefaultRequestChannel(controlBusChannel());
            gateway.setDefaultRequestTimeout(300l);
            gateway.setDefaultReplyTimeout(300l);
            return gateway;
        }
        
    }


    ControlBusGateway.java
    public interface ControlBusGateway {
        
        public void sendCommand(String command);

    }


    各個應用實例運行時,其中的LockRegistryLeaderInitiator會自動運行,搶奪LEADER數據,最終只有一個實例奪取。之后再執行MyCandidate中的代碼。







    posted on 2022-01-20 13:49 paulwong 閱讀(547) 評論(0)  編輯  收藏 所屬分類: SPRING INTERGRATION

    主站蜘蛛池模板: 亚洲视频在线免费| 国产精品亚洲精品观看不卡| 国产在线观看免费视频软件| 久久久无码精品亚洲日韩按摩 | 国产伦精品一区二区三区免费下载| 免费人成动漫在线播放r18| 久久精品国产精品亚洲艾| 最近中文字幕mv免费高清电影| 免费激情网站国产高清第一页| 亚洲av福利无码无一区二区| 成年女性特黄午夜视频免费看| 九九99热免费最新版| 亚洲AV无码国产精品色| 亚洲精品国产V片在线观看| 131美女爱做免费毛片| 日本高清免费中文在线看| 亚洲精品一卡2卡3卡三卡四卡| 免费国产一级特黄久久| 美女内射无套日韩免费播放| 国产AV日韩A∨亚洲AV电影| 日韩亚洲Av人人夜夜澡人人爽| www亚洲一级视频com| 亚洲成年人免费网站| 国产成人自产拍免费视频| www.亚洲日本| 亚洲va国产va天堂va久久| 国产免费久久精品| 国产人成免费视频网站| 99久久免费国产精精品| 国产99久久亚洲综合精品| wwwxxx亚洲| 亚洲高清中文字幕综合网| 亚洲午夜福利在线观看| 四虎永久在线精品免费影视 | 曰批视频免费40分钟试看天天| 羞羞视频免费网站日本| 亚洲另类自拍丝袜第五页| 亚洲美女激情视频| 婷婷亚洲久悠悠色悠在线播放| 亚洲第一区精品观看| 天天天欲色欲色WWW免费|