<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    SPRING INTEGRATION - 集群選主、分布式鎖

    集群通常是有多個相同的實例,但對于定時任務場景,只希望有一個實例工作即可,如果這個實例掛了,其他實例可以頂替。

    這個問題的方案則是集群選主,一個集群中,只有一個LEADER,由LEADER負責執行定時任務工作。當LEADER被取消時,會在剩下的實例中再選LEADER。

    持有分布式鎖的實例則是LEADER。

    SPRING INTEGRATION JDBC 則已提供相關功能。

    pom.xml
            <dependency>
               <groupId>org.springframework.integration</groupId>
               <artifactId>spring-integration-jdbc</artifactId>
            </dependency>

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>

            <dependency>
               <groupId>org.flywaydb</groupId>
               <artifactId>flyway-core</artifactId>
            </dependency>
            
            <dependency>
                <groupId>org.mariadb.jdbc</groupId>
                <artifactId>mariadb-java-client</artifactId>
            </dependency>

    LeaderElectionIntegrationConfig.java
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;

    import javax.sql.DataSource;

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.jdbc.lock.DefaultLockRepository;
    import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
    import org.springframework.integration.jdbc.lock.LockRepository;
    import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;

    import com.paul.integration.leader.ControlBusGateway;
    import com.paul.integration.leader.MyCandidate;

    @Configuration
    public class LeaderElectionIntegrationConfig {
        
        @Bean
        public List<String> needToStartupAdapterList(){
            return new CopyOnWriteArrayList<>();
        }
        
        @Bean
        public DefaultLockRepository defaultLockRepository(DataSource dataSource){
            DefaultLockRepository defaultLockRepository =
                    new DefaultLockRepository(dataSource);
    //        defaultLockRepository.setTimeToLive(60_000);
            return defaultLockRepository;
        }

        @Bean
        public JdbcLockRegistry jdbcLockRegistry(LockRepository lockRepository){
            return new JdbcLockRegistry(lockRepository);
        }
        
        @Bean
        public MyCandidate myCandidate(
            ControlBusGateway controlBusGateway,
            List<String> needToStartupAdapterList
        ) {
            return new MyCandidate(controlBusGateway, needToStartupAdapterList);
        }
        
        @Bean
        public LockRegistryLeaderInitiator leaderInitiator() {
            return new LockRegistryLeaderInitiator(
                        jdbcLockRegistry(null), myCandidate(nullnull)
                   );
        }
        
        
    }


    MyCandidate.java
    import java.util.List;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.integration.leader.Context;
    import org.springframework.integration.leader.DefaultCandidate;

    import com.novacredit.mcra.mcracommon.integration.gateway.ControlBusGateway;

    public class MyCandidate extends DefaultCandidate{
        
        private static final Logger LOG = LoggerFactory.getLogger(MyCandidate.class);
        
        private List<String> needToStartupAdapterList;
        
        private ControlBusGateway controlBusGateway;
        
        public MyCandidate(
            ControlBusGateway controlBusGateway,
            List<String> needToStartupAdapterList
        ) {
            this.controlBusGateway = controlBusGateway;
            this.needToStartupAdapterList = needToStartupAdapterList;
        }
        
        @Override
        public void onGranted(Context context) {
            super.onGranted(context);
            LOG.info("*** Leadership granted ***");
            LOG.info("STARTING MONGODB POLLER");
            needToStartupAdapterList
                .forEach(
                    c -> {
    //                    c = "@'testIntegrationFlow.org.springframework.integration.config."
    //                            + "SourcePollingChannelAdapterFactoryBean#0'";
                        String command = c + ".start()";
                        LOG.info("-----{}", command);
                        controlBusGateway.sendCommand(command);
                    }
                 );
            LOG.info("STARTUP MESSAGE SENT");

        }

        @Override
        public void onRevoked(Context context) {
            super.onRevoked(context);
            LOG.info("*** Leadership revoked ***");
            LOG.info("STOPPING MONGODB POLLER");
            needToStartupAdapterList
                .forEach(
                    c -> {
    //                    c = "@'testIntegrationConfig.testIntegrationFlow."
    //                            + "mongoMessageSource.inboundChannelAdapter'";
                        String command = c + ".stop()";
                        LOG.info("-----{}", command);
    //                    controlBusGateway.sendCommand(command);
                    }
                 );
            LOG.info("SHUTDOWN MESSAGE SENT");
        }

    }


    ControlBusIntegrationConfig.java
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.dsl.MessageChannels;
    import org.springframework.integration.gateway.GatewayProxyFactoryBean;
    import org.springframework.integration.handler.LoggingHandler;
    import org.springframework.messaging.MessageChannel;

    import com.paul.integration.gateway.ControlBusGateway;

    @Configuration
    public class ControlBusIntegrationConfig {
        
        @Bean
        public MessageChannel controlBusChannel() {
            return MessageChannels.direct().get();
        }
        
        @Bean
        public IntegrationFlow controlBusFlow() {
            return IntegrationFlows.from(controlBusChannel())
                        .log(LoggingHandler.Level.INFO, "controlBusChannel")
                        .controlBus()
                        .get();
        }
        
        @Bean
        public GatewayProxyFactoryBean controlBusGateway() {
            GatewayProxyFactoryBean gateway = new GatewayProxyFactoryBean(ControlBusGateway.class);
            gateway.setDefaultRequestChannel(controlBusChannel());
            gateway.setDefaultRequestTimeout(300l);
            gateway.setDefaultReplyTimeout(300l);
            return gateway;
        }
        
    }


    ControlBusGateway.java
    public interface ControlBusGateway {
        
        public void sendCommand(String command);

    }


    各個應用實例運行時,其中的LockRegistryLeaderInitiator會自動運行,搶奪LEADER數據,最終只有一個實例奪取。之后再執行MyCandidate中的代碼。







    posted on 2022-01-20 13:49 paulwong 閱讀(547) 評論(0)  編輯  收藏 所屬分類: SPRING INTERGRATION

    主站蜘蛛池模板: 小草在线看片免费人成视久网| 国产成人免费视频| 久久亚洲AV午夜福利精品一区| 亚洲成人免费电影| 黄色三级三级免费看| 亚洲成a人片在线观看日本| 成年男女免费视频网站| a级大片免费观看| 亚洲国产精品美女久久久久| 亚洲精品高清无码视频| 国产精品久久免费视频| 91福利免费体验区观看区| 黄色视频在线免费观看| 亚洲欧美不卡高清在线| 亚洲综合激情九月婷婷| 久久综合亚洲色HEZYO国产| 无码人妻久久一区二区三区免费丨 | 8090在线观看免费观看| 国产精品永久免费10000| 大地影院MV在线观看视频免费 | 中文字幕亚洲综合久久菠萝蜜 | 成人A片产无码免费视频在线观看 成人电影在线免费观看 | 久久久久国产成人精品亚洲午夜 | 亚洲男人第一无码aⅴ网站 | a一级毛片免费高清在线| 亚洲精品女同中文字幕| 女bbbbxxxx另类亚洲| 亚洲中文久久精品无码1| 亚洲精品白浆高清久久久久久| 亚洲欧洲国产精品香蕉网| 亚洲成年人啊啊aa在线观看| 成人无码区免费视频观看| 国产免费一区二区三区| 黑人粗长大战亚洲女2021国产精品成人免费视频 | 18观看免费永久视频| 在线观看人成网站深夜免费| 三年片在线观看免费观看高清电影 | 亚洲欧美自偷自拍另类视| 一区二区三区在线免费| 免费无遮挡无码视频在线观看 | 国产亚洲精品影视在线产品|