<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    SPRING INTEGRATION - 集群選主、分布式鎖

    集群通常是有多個相同的實例,但對于定時任務場景,只希望有一個實例工作即可,如果這個實例掛了,其他實例可以頂替。

    這個問題的方案則是集群選主,一個集群中,只有一個LEADER,由LEADER負責執行定時任務工作。當LEADER被取消時,會在剩下的實例中再選LEADER。

    持有分布式鎖的實例則是LEADER。

    SPRING INTEGRATION JDBC 則已提供相關功能。

    pom.xml
            <dependency>
               <groupId>org.springframework.integration</groupId>
               <artifactId>spring-integration-jdbc</artifactId>
            </dependency>

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>

            <dependency>
               <groupId>org.flywaydb</groupId>
               <artifactId>flyway-core</artifactId>
            </dependency>
            
            <dependency>
                <groupId>org.mariadb.jdbc</groupId>
                <artifactId>mariadb-java-client</artifactId>
            </dependency>

    LeaderElectionIntegrationConfig.java
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;

    import javax.sql.DataSource;

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.jdbc.lock.DefaultLockRepository;
    import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
    import org.springframework.integration.jdbc.lock.LockRepository;
    import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;

    import com.paul.integration.leader.ControlBusGateway;
    import com.paul.integration.leader.MyCandidate;

    @Configuration
    public class LeaderElectionIntegrationConfig {
        
        @Bean
        public List<String> needToStartupAdapterList(){
            return new CopyOnWriteArrayList<>();
        }
        
        @Bean
        public DefaultLockRepository defaultLockRepository(DataSource dataSource){
            DefaultLockRepository defaultLockRepository =
                    new DefaultLockRepository(dataSource);
    //        defaultLockRepository.setTimeToLive(60_000);
            return defaultLockRepository;
        }

        @Bean
        public JdbcLockRegistry jdbcLockRegistry(LockRepository lockRepository){
            return new JdbcLockRegistry(lockRepository);
        }
        
        @Bean
        public MyCandidate myCandidate(
            ControlBusGateway controlBusGateway,
            List<String> needToStartupAdapterList
        ) {
            return new MyCandidate(controlBusGateway, needToStartupAdapterList);
        }
        
        @Bean
        public LockRegistryLeaderInitiator leaderInitiator() {
            return new LockRegistryLeaderInitiator(
                        jdbcLockRegistry(null), myCandidate(nullnull)
                   );
        }
        
        
    }


    MyCandidate.java
    import java.util.List;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.integration.leader.Context;
    import org.springframework.integration.leader.DefaultCandidate;

    import com.novacredit.mcra.mcracommon.integration.gateway.ControlBusGateway;

    public class MyCandidate extends DefaultCandidate{
        
        private static final Logger LOG = LoggerFactory.getLogger(MyCandidate.class);
        
        private List<String> needToStartupAdapterList;
        
        private ControlBusGateway controlBusGateway;
        
        public MyCandidate(
            ControlBusGateway controlBusGateway,
            List<String> needToStartupAdapterList
        ) {
            this.controlBusGateway = controlBusGateway;
            this.needToStartupAdapterList = needToStartupAdapterList;
        }
        
        @Override
        public void onGranted(Context context) {
            super.onGranted(context);
            LOG.info("*** Leadership granted ***");
            LOG.info("STARTING MONGODB POLLER");
            needToStartupAdapterList
                .forEach(
                    c -> {
    //                    c = "@'testIntegrationFlow.org.springframework.integration.config."
    //                            + "SourcePollingChannelAdapterFactoryBean#0'";
                        String command = c + ".start()";
                        LOG.info("-----{}", command);
                        controlBusGateway.sendCommand(command);
                    }
                 );
            LOG.info("STARTUP MESSAGE SENT");

        }

        @Override
        public void onRevoked(Context context) {
            super.onRevoked(context);
            LOG.info("*** Leadership revoked ***");
            LOG.info("STOPPING MONGODB POLLER");
            needToStartupAdapterList
                .forEach(
                    c -> {
    //                    c = "@'testIntegrationConfig.testIntegrationFlow."
    //                            + "mongoMessageSource.inboundChannelAdapter'";
                        String command = c + ".stop()";
                        LOG.info("-----{}", command);
    //                    controlBusGateway.sendCommand(command);
                    }
                 );
            LOG.info("SHUTDOWN MESSAGE SENT");
        }

    }


    ControlBusIntegrationConfig.java
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.dsl.MessageChannels;
    import org.springframework.integration.gateway.GatewayProxyFactoryBean;
    import org.springframework.integration.handler.LoggingHandler;
    import org.springframework.messaging.MessageChannel;

    import com.paul.integration.gateway.ControlBusGateway;

    @Configuration
    public class ControlBusIntegrationConfig {
        
        @Bean
        public MessageChannel controlBusChannel() {
            return MessageChannels.direct().get();
        }
        
        @Bean
        public IntegrationFlow controlBusFlow() {
            return IntegrationFlows.from(controlBusChannel())
                        .log(LoggingHandler.Level.INFO, "controlBusChannel")
                        .controlBus()
                        .get();
        }
        
        @Bean
        public GatewayProxyFactoryBean controlBusGateway() {
            GatewayProxyFactoryBean gateway = new GatewayProxyFactoryBean(ControlBusGateway.class);
            gateway.setDefaultRequestChannel(controlBusChannel());
            gateway.setDefaultRequestTimeout(300l);
            gateway.setDefaultReplyTimeout(300l);
            return gateway;
        }
        
    }


    ControlBusGateway.java
    public interface ControlBusGateway {
        
        public void sendCommand(String command);

    }


    各個應用實例運行時,其中的LockRegistryLeaderInitiator會自動運行,搶奪LEADER數據,最終只有一個實例奪取。之后再執行MyCandidate中的代碼。







    posted on 2022-01-20 13:49 paulwong 閱讀(558) 評論(0)  編輯  收藏 所屬分類: SPRING INTERGRATION

    主站蜘蛛池模板: 亚洲Av永久无码精品黑人| a级毛片在线视频免费观看 | 亚洲人成影院在线| 台湾一级毛片永久免费| 亚洲国产精华液2020| 国产亚洲精品不卡在线| 24小时日本韩国高清免费| 亚洲精品中文字幕无乱码麻豆| 国产婷婷高清在线观看免费| 中国极品美軳免费观看| 亚洲男人电影天堂| 青青青国产色视频在线观看国产亚洲欧洲国产综合 | 天天综合亚洲色在线精品| 亚洲一区二区三区在线视频| 国产精品综合专区中文字幕免费播放 | 亚洲第一成年男人的天堂| 久久久久成人精品免费播放动漫| 水蜜桃亚洲一二三四在线| a毛片在线还看免费网站| 亚洲成年人在线观看| 成人免费看片又大又黄| 免费人成再在线观看网站| 久久国产亚洲精品麻豆| 免费v片在线观看视频网站| 亚洲av无码片vr一区二区三区| 亚洲第一网站男人都懂| 一边摸一边桶一边脱免费视频| 亚洲精品乱码久久久久久按摩| 日本h在线精品免费观看| 亚洲AV无码一区二区三区鸳鸯影院| 亚洲精品制服丝袜四区| 国产h视频在线观看免费| 无码日韩人妻AV一区免费l | 亚洲国产成人综合精品| 亚洲人成网站在线观看播放| 国产一卡2卡3卡4卡2021免费观看| 免费看黄福利app导航看一下黄色录像| 久久久久亚洲AV成人无码网站 | 三上悠亚电影全集免费 | 亚洲AV无码AV男人的天堂不卡| 亚洲精品午夜国产VA久久成人 |