<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    SPRING BATCH remote chunking模式下可同時處理多文件

    SPRING BATCH remote chunking模式下,如果要同一時間處理多個文件,按DEMO的默認配置,是會報錯的,這是由于多個文件的處理的MASTER方,是用同一個QUEUE名,這樣SLAVE中處理多個JOB INSTANCE時,會返回不同的JOB-INSTANCE-ID,導致報錯。

    這時需更改SPRING BATCH使用SPRING INTEGRATION的模式中的GATEWAY組件。

    GATEWAY組件是工作在REQUEST/RESPONSE模式下,即發一個MESSAGE到某一QUEUE時,要從REPLY QUEUE等到CONSUMER返回結果時,才往下繼續。

    OUTBOUND GATEWAY:從某一CHANNEL獲取MESSAGE,發往REQUEST QUEUE,從REPLY QUEUE等到CONSUMER返回結果,將此MESSAGE發往下一CHANNEL。

    INBOUND GATEWAY:從某一QUEUE獲取MESSAGE,發往某一REQUEST CHANNEL,從REPLY CHANNEL等到返回結果,將此MESSAGE發往下一QUEUE。

    詳情參見此文:https://blog.csdn.net/alexlau8/article/details/78056064

        <!-- Master jms -->
        <int:channel id="MasterRequestChannel">
            <int:dispatcher task-executor="RequestPublishExecutor"/>
        </int:channel>
        <task:executor id="RequestPublishExecutor" pool-size="5-10" queue-capacity="0"/>
    <!--    <int-jms:outbound-channel-adapter 
            connection-factory="connectionFactory" 
            destination-name="RequestQueue" 
            channel="MasterRequestChannel"/> 
    -->

        <int:channel id="MasterReplyChannel"/>
    <!--    <int-jms:message-driven-channel-adapter 
            connection-factory="connectionFactory" 
            destination-name="ReplyQueue"
            channel="MasterReplyChannel"/> 
    -->

        <int-jms:outbound-gateway
            
    connection-factory="connectionFactory"
            correlation-key
    ="JMSCorrelationID"
            request-channel
    ="MasterRequestChannel"
            request-destination-name
    ="RequestQueue"
            receive-timeout
    ="30000"
            reply-channel
    ="MasterReplyChannel"
            reply-destination-name
    ="ReplyQueue"
            async
    ="true">
            <int-jms:reply-listener />
        </int-jms:outbound-gateway>

        <!-- Slave jms -->
        <int:channel id="SlaveRequestChannel"/>
    <!--    <int-jms:message-driven-channel-adapter
            connection-factory="connectionFactory" 
            destination-name="RequestQueue"
            channel="SlaveRequestChannel"/> 
    -->

        <int:channel id="SlaveReplyChannel"/>
    <!--    <int-jms:outbound-channel-adapter 
            connection-factory="connectionFactory" 
            destination-name="ReplyQueue"
            channel="SlaveReplyChannel"/> 
    -->

        <int-jms:inbound-gateway
            
    connection-factory="connectionFactory"
            correlation-key
    ="JMSCorrelationID"
            request-channel
    ="SlaveRequestChannel"
            request-destination-name
    ="RequestQueue"
            reply-channel
    ="SlaveReplyChannel"
            default-reply-queue-name
    ="ReplyQueue"/>

    MASTER配置
    package com.paul.testspringbatch.config.master;

    import javax.jms.ConnectionFactory;

    import org.springframework.beans.factory.config.CustomScopeConfigurer;
    //import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.context.annotation.Scope;
    import org.springframework.context.support.SimpleThreadScope;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.channel.QueueChannel;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.jms.JmsOutboundGateway;

    import com.paul.testspringbatch.common.constant.IntegrationConstant;

    @Configuration
    @EnableIntegration
    @Profile("batch-master")
    public class IntegrationMasterConfiguration {
        
    //    @Value("${broker.url}")
    //    private String brokerUrl;


    //    @Bean
    //    public ActiveMQConnectionFactory connectionFactory() {
    //        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    //        connectionFactory.setBrokerURL(this.brokerUrl);
    //        connectionFactory.setTrustAllPackages(true);
    //        return connectionFactory;
    //    }

        /*
         * Configure outbound flow (requests going to workers)
         
    */
        @Bean
    //    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public DirectChannel requests() {
            return new DirectChannel();
        }

    //    @Bean
    //    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
    //        return IntegrationFlows
    //                .from(requests())
    //                .handle(Jms.outboundAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REQUEST_DESTINATION))
    //                .get();
    //    }
        
         @Bean
         public CustomScopeConfigurer customScopeConfigurer() {
             CustomScopeConfigurer customScopeConfigurer = new CustomScopeConfigurer();
             customScopeConfigurer.addScope("thread", new SimpleThreadScope());
             return customScopeConfigurer;
         }
         
    //     @Bean
    //     public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
    //         return new BeanFactoryPostProcessor() {
    //                
    //             @Override
    //             public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    //                    beanFactory.registerScope("thread", new SimpleThreadScope());
    //                }
    //              };
    //     }
        
        /*
         * Configure inbound flow (replies coming from workers)
         
    */
        @Bean
        @Scope(value = "thread"/* , proxyMode = ScopedProxyMode.NO */)
    //    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public QueueChannel replies() {
            return new QueueChannel();
        }

    //    @Bean
    //    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
    //        return IntegrationFlows
    //                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(IntegrationConstant.MASTER_REPLY_DESTINATION))
    //                .channel(replies())
    //                .get();
    //    }

        @Bean
        public JmsOutboundGateway jmsOutboundGateway(ConnectionFactory connectionFactory) {
            JmsOutboundGateway jmsOutboundGateway = new JmsOutboundGateway();
            jmsOutboundGateway.setConnectionFactory(connectionFactory);
            jmsOutboundGateway.setRequestDestinationName(IntegrationConstant.MASTER_REQUEST_DESTINATION);//2. send the message to this destination
            jmsOutboundGateway.setRequiresReply(true);
            jmsOutboundGateway.setCorrelationKey(IntegrationConstant.JMS_CORRELATION_KEY);//3. let the broker filter the message
            jmsOutboundGateway.setAsync(true);//must be async, so that JMS_CORRELATION_KEY work
            jmsOutboundGateway.setUseReplyContainer(true);
            jmsOutboundGateway.setReplyDestinationName(IntegrationConstant.MASTER_REPLY_DESTINATION);//4. waiting the response from this destination
            jmsOutboundGateway.setReceiveTimeout(30_000);
            return jmsOutboundGateway;
        }

        @Bean
        public IntegrationFlow jmsOutboundGatewayFlow(ConnectionFactory connectionFactory) {
            return IntegrationFlows
                            .from(requests())//1. receive message from this channel
                            .handle(jmsOutboundGateway(connectionFactory))
                            .channel(replies())//5. send back the response to this channel
                            .get();
        }

    }


    SLAVE配置:
    package com.paul.testspringbatch.config.slave;

    import javax.jms.ConnectionFactory;

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.jms.dsl.Jms;

    import com.paul.testspringbatch.common.constant.IntegrationConstant;

    @Configuration
    @EnableIntegration
    @Profile("batch-slave")
    public class IntegrationSlaveConfiguration {
        

        /*
         * Configure inbound flow (requests coming from the master)
         
    */
        @Bean
        public DirectChannel requests() {
            return new DirectChannel();
        }

    //    @Bean
    //    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
    //        return IntegrationFlows
    //                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
    //                .channel(requests())
    //                .get();
    //    }

        /*
         * Configure outbound flow (replies going to the master)
         
    */
        @Bean
        public DirectChannel replies() {
            return new DirectChannel();
        }

    //    @Bean
    //    public IntegrationFlow outboundFlow(ConnectionFactory connectionFactory) {
    //        return IntegrationFlows
    //                .from(replies())
    //                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
    //                .get();
    //    }

        @Bean
        public IntegrationFlow inboundGatewayFlow(ConnectionFactory connectionFactory) {
            return IntegrationFlows
                        .from(Jms
                                .inboundGateway(connectionFactory)
                                .destination(IntegrationConstant.SLAVE_HANDLE_MASTER_REQUEST_DESTINATION)//1. receive message from this channel.
                                .correlationKey(IntegrationConstant.JMS_CORRELATION_KEY)//2. let the broker filter the message
                                .requestChannel(requests())//3. send the message to this channel
                                .replyChannel(replies())//4. waitting the result from this channel
                                .defaultReplyQueueName(IntegrationConstant.SLAVE_RETURN_RESULT_DESTINATION)//5.send back the result to this destination to the master.
                                )
                        .get();
        }

    }

    posted on 2019-07-16 14:38 paulwong 閱讀(847) 評論(0)  編輯  收藏 所屬分類: SRPING BATCH

    主站蜘蛛池模板: 爱爱帝国亚洲一区二区三区| 91青青国产在线观看免费| 中文字幕日韩亚洲| 51精品视频免费国产专区| 亚洲中文无码永久免费| 永久亚洲成a人片777777| **毛片免费观看久久精品| 久久久久亚洲精品无码网址色欲| 亚洲一区二区三区在线观看精品中文| 日韩精品人妻系列无码专区免费 | 日本久久久久亚洲中字幕| 午夜一区二区免费视频| 黄色网站软件app在线观看免费 | 亚洲视屏在线观看| 免费乱理伦在线播放| 无码国产精品一区二区免费16| 亚洲国产美女精品久久久| 国产亚洲福利精品一区| 拍拍拍又黄又爽无挡视频免费| 久久精品成人免费国产片小草| 亚洲中文字幕AV在天堂| 亚洲国产精品无码专区在线观看| 好吊妞788免费视频播放| 久9久9精品免费观看| 四虎成人精品国产永久免费无码 | 中国一级毛片免费看视频| 亚洲欧洲无码AV不卡在线| 亚洲国产天堂久久综合网站| 四虎影院永久免费观看| 国产在线国偷精品产拍免费| 97无码人妻福利免费公开在线视频| 亚洲成av人片天堂网无码】| 亚洲精品综合久久中文字幕 | 区三区激情福利综合中文字幕在线一区亚洲视频1 | 三级片免费观看久久| 亚洲欧美日韩久久精品| 亚洲黄网站wwwwww| 亚洲人精品午夜射精日韩| 免费又黄又硬又爽大片| 黄网址在线永久免费观看 | 免费无码成人AV在线播放不卡|