<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    SPRING BATCH remote chunking模式下可同時處理多文件

    SPRING BATCH remote chunking模式下,如果要同一時間處理多個文件,按DEMO的默認配置,是會報錯的,這是由于多個文件的處理的MASTER方,是用同一個QUEUE名,這樣SLAVE中處理多個JOB INSTANCE時,會返回不同的JOB-INSTANCE-ID,導致報錯。

    這時需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY組件。

    GATEWAY組件是工作在REQUEST/RESPONSE模式下,即發一個MESSAGE到某一QUEUE時,要從REPLY QUEUE等到CONSUMER返回結果時,才往下繼續。

    OUTBOUND GATEWAY:從某一CHANNEL獲取MESSAGE,發往REQUEST QUEUE,從REPLY QUEUE等到CONSUMER返回結果,將此MESSAGE發往下一CHANNEL。

    INBOUND GATEWAY:從某一QUEUE獲取MESSAGE,發往某一REQUEST CHANNEL,從REPLY CHANNEL等到返回結果,將此MESSAGE發往下一QUEUE。

    詳情參見此文:https://blog.csdn.net/alexlau8/article/details/78056064

        <!-- Master jms -->
        <int:channel id="MasterRequestChannel">
            <int:dispatcher task-executor="RequestPublishExecutor"/>
        </int:channel>
        <task:executor id="RequestPublishExecutor" pool-size="5-10" queue-capacity="0"/>
    <!--    <int-jms:outbound-channel-adapter 
            connection-factory="connectionFactory" 
            destination-name="RequestQueue" 
            channel="MasterRequestChannel"/> 
    -->

        <int:channel id="MasterReplyChannel"/>
    <!--    <int-jms:message-driven-channel-adapter 
            connection-factory="connectionFactory" 
            destination-name="ReplyQueue"
            channel="MasterReplyChannel"/> 
    -->

        <int-jms:outbound-gateway
            
    connection-factory="connectionFactory"
            correlation-key
    ="JMSCorrelationID"
            request-channel
    ="MasterRequestChannel"
            request-destination-name
    ="RequestQueue"
            receive-timeout
    ="30000"
            reply-channel
    ="MasterReplyChannel"
            reply-destination-name
    ="ReplyQueue"
            async
    ="true">
            <int-jms:reply-listener />
        </int-jms:outbound-gateway>

        <!-- Slave jms -->
        <int:channel id="SlaveRequestChannel"/>
    <!--    <int-jms:message-driven-channel-adapter
            connection-factory="connectionFactory" 
            destination-name="RequestQueue"
            channel="SlaveRequestChannel"/> 
    -->

        <int:channel id="SlaveReplyChannel"/>
    <!--    <int-jms:outbound-channel-adapter 
            connection-factory="connectionFactory" 
            destination-name="ReplyQueue"
            channel="SlaveReplyChannel"/> 
    -->

        <int-jms:inbound-gateway
            
    connection-factory="connectionFactory"
            correlation-key
    ="JMSCorrelationID"
            request-channel
    ="SlaveRequestChannel"
            request-destination-name
    ="RequestQueue"
            reply-channel
    ="SlaveReplyChannel"
            default-reply-queue-name
    ="ReplyQueue"/>

    MASTER配置
    package com.paul.testspringbatch.config.master;

    import javax.jms.ConnectionFactory;

    import org.springframework.beans.factory.config.CustomScopeConfigurer;
    //import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.context.annotation.Scope;
    import org.springframework.context.support.SimpleThreadScope;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.channel.QueueChannel;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.jms.JmsOutboundGateway;

    import com.paul.testspringbatch.common.constant.IntegrationConstant;

    @Configuration
    @EnableIntegration
    @Profile("batch-master")
    public class IntegrationMasterConfiguration {
        
    //    @Value("${broker.url}")
    //    private String brokerUrl;


    //    @Bean
    //    public ActiveMQConnectionFactory connectionFactory() {
    //        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    //        connectionFactory.setBrokerURL(this.brokerUrl);
    //        connectionFactory.setTrustAllPackages(true);
    //        return connectionFactory;
    //    }

        /*
         * Configure outbound flow (requests going to workers)
         
    */
        @Bean
    //    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public DirectChannel requests() {
            return new DirectChannel();
        }

    //    @Bean
    //    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
    //        return IntegrationFlows
    //                .from(requests())
    //                .handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
    //                .get();
    //    }
        
         @Bean
         public CustomScopeConfigurer customScopeConfigurer() {
             CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
             customScopeConfigurer.addScope("thread", new SimpleThreadScope());
             return customScopeConfigurer;
         }
         
    //     @Bean
    //     public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
    //         return new BeanFactoryPostProcessor() {
    //                
    //             @Override
    //             public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    //                    beanFactory.registerScope("thread", new SimpleThreadScope());
    //                }
    //              };
    //     }
        
        /*
         * Configure inbound flow (replies coming from workers)
         
    */
        @Bean
        @Scope(value = "thread"/* , proxyMode = ScopedProxyMode.NO */)
    //    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public QueueChannel replies() {
            return new QueueChannel();
        }

    //    @Bean
    //    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
    //        return IntegrationFlows
    //                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
    //                .channel(replies())
    //                .get();
    //    }

        @Bean
        public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
            JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
            jmsOutboundGateway.setConnectionFactory(connectionFactory);
            jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION);//2. send the message to this destination
            jmsOutboundGateway.setRequiresReply(true);
            jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY);//3. let the broker filter the message
            jmsOutboundGateway.setAsync(true);//must be async, so that JMS_CORRELATION_KEY work
            jmsOutboundGateway.setUseReplyContainer(true);
            jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION);//4. waiting the response from this destination
            jmsOutboundGateway.setReceiveTimeout(30_000);
            return jmsOutboundGateway;
        }

        @Bean
        public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
            return IntegrationFlows
                            .from(requests())//1. receive message from this channel
                            .handle(jmsOutboundGateway(connectionFactory))
                            .channel(replies())//5. send back the response to this channel
                            .get();
        }

    }


    SLAVE配置:
    package com.paul.testspringbatch.config.slave;

    import javax.jms.ConnectionFactory;

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.jms.dsl.Jms;

    import com.paul.testspringbatch.common.constant.IntegrationConstant;

    @Configuration
    @EnableIntegration
    @Profile("batch-slave")
    public class IntegrationSlaveConfiguration {
        

        /*
         * Configure inbound flow (requests coming from the master)
         
    */
        @Bean
        public DirectChannel requests() {
            return new DirectChannel();
        }

    //    @Bean
    //    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
    //        return IntegrationFlows
    //                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
    //                .channel(requests())
    //                .get();
    //    }

        /*
         * Configure outbound flow (replies going to the master)
         
    */
        @Bean
        public DirectChannel replies() {
            return new DirectChannel();
        }

    //    @Bean
    //    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
    //        return IntegrationFlows
    //                .from(replies())
    //                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
    //                .get();
    //    }

        @Bean
        public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
            return IntegrationFlows
                        .from(Jms
                                .inboundGateway(connectionFactory)
                                .destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION)//1. receive message from this channel.
                                .correlationKey(IntegrationConstant.JMS_CORRELATION_KEY)//2. let the broker filter the message
                                .requestChannel(requests())//3. send the message to this channel
                                .replyChannel(replies())//4. waitting the result from this channel
                                .defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION)//5.send back the result to this destination to the master.
                                )
                        .get();
        }

    }

    posted on 2019-07-16 14:38 paulwong 閱讀(847) 評論(0)  編輯  收藏 所屬分類: SRPING BATCH

    主站蜘蛛池模板: 99亚偷拍自图区亚洲| 在线观着免费观看国产黄| 国产乱子伦精品免费女| 一区二区亚洲精品精华液| 黄色免费网站网址| 亚洲午夜精品国产电影在线观看| 日韩午夜理论免费TV影院| 亚洲va无码手机在线电影| 中文字幕日本人妻久久久免费| 亚洲综合国产精品第一页| 特a级免费高清黄色片| 亚洲乱码中文字幕手机在线| 成年网在线观看免费观看网址 | 亚洲成aⅴ人片在线观| 84pao国产成视频免费播放| 亚洲无限乱码一二三四区| 91精品成人免费国产片| 亚洲大香伊人蕉在人依线| 四虎国产精品免费久久| 337P日本欧洲亚洲大胆精品| 国产免费小视频在线观看| 四虎永久在线精品免费一区二区| 亚洲色欲久久久综合网东京热 | 1000部拍拍拍18勿入免费凤凰福利| 亚洲美女视频一区| 在线观看免费高清视频| 白白色免费在线视频| 亚洲欧洲∨国产一区二区三区| 午夜影院免费观看| 亚洲日韩看片无码电影| 亚洲精品国产精品乱码不卞 | 最近2019中文字幕mv免费看| 久久精品国产亚洲av瑜伽| 国产成人毛片亚洲精品| 69视频在线观看高清免费| 亚洲av无码专区在线观看亚| 国产成人亚洲综合| 欧洲一级毛片免费| 国产亚洲Av综合人人澡精品| 亚洲av无码专区国产乱码在线观看| www视频在线观看免费|