<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    如何優(yōu)雅地停止SPRING BATCH中的REMOTE CHUNKING JOB

    SPRING BATCH中的REMOTE CHUNKING JOB,由于是基于MASTER/SLAVE的架構(gòu),其中某個(gè)STEP是會(huì)在遠(yuǎn)程機(jī)器中執(zhí)行,如果要停止這個(gè)JOB,需要考慮兩個(gè)問(wèn)題:
    1、什么時(shí)候發(fā)出停止指令
    2、如何等待遠(yuǎn)程STEP的完成

    一般停止JOB,可用JobOperator.stop(long executionId)來(lái)停止,但這個(gè)無(wú)法確定什么時(shí)候發(fā)出停止指令,如果是在CHUNK的處理中途發(fā)出,則會(huì)出現(xiàn)回滾的現(xiàn)象。
    BATCH_STEP_EXECUTION thead tr {background-color: ActiveCaption; color: CaptionText;} th, td {vertical-align: top; font-family: "Tahoma", Arial, Helvetica, sans-serif; font-size: 8pt; padding: 4px; } table, td {border: 1px solid silver;} table {border-collapse: collapse;} thead .col0 {width: 173px;} .col0 {text-align: right;} thead .col1 {width: 82px;} .col1 {text-align: right;} thead .col2 {width: 282px;} thead .col3 {width: 164px;} .col3 {text-align: right;} thead .col4 {width: 161px;} thead .col5 {width: 161px;} thead .col6 {width: 109px;} thead .col7 {width: 127px;} .col7 {text-align: right;} thead .col8 {width: 109px;} .col8 {text-align: right;} thead .col9 {width: 118px;} .col9 {text-align: right;} thead .col10 {width: 117px;} .col10 {text-align: right;} thead .col11 {width: 142px;} .col11 {text-align: right;} thead .col12 {width: 150px;} .col12 {text-align: right;} thead .col13 {width: 166px;} .col13 {text-align: right;} thead .col14 {width: 137px;} .col14 {text-align: right;} thead .col15 {width: 109px;} thead .col16 {width: 156px;} thead .col17 {width: 161px;}
    STEP_EXECUTION_ID VERSION STEP_NAME JOB_EXECUTION_ID START_TIME END_TIME STATUS COMMIT_COUNT READ_COUNT FILTER_COUNT WRITE_COUNT READ_SKIP_COUNT WRITE_SKIP_COUNT PROCESS_SKIP_COUNT ROLLBACK_COUNT EXIT_CODE EXIT_MESSAGE LAST_UPDATED
    2304 169 step2HandleXXX 434 2020-06-22 16:27:54 2020-06-22 16:32:46 STOPPED 167 5010 0 4831 0 155 0 161 STOPPED org.springframework.batch.core.JobInterruptedException 2020-06-22 16:32:46


    另外SPRING BATCH也不會(huì)等遠(yuǎn)程STEP執(zhí)行完成,就將JOB的狀態(tài)設(shè)為Complete。

    發(fā)出停止的指令應(yīng)通過(guò)ChunkListener達(dá)成:

    public class ItemMasterChunkListener extends ChunkListenerSupport{
        
        private static final Logger log = LoggerFactory.getLogger(ItemMasterChunkListener.class);
        
        
        @Override
        public void beforeChunk(ChunkContext context) {
            log.info("ItemMasterProcessor.beforeChunk");
        }


        @Override
        public void afterChunk(ChunkContext context) {
            log.info("ItemMasterProcessor.afterChunk");
            if(XXXX.isStoppingOrPausing()) {
                log.info("context.getStepContext().getStepExecution().setTerminateOnly()");
                context.getStepContext().getStepExecution().setTerminateOnly();
            }
        }


        @Override
        public void afterChunkError(ChunkContext context) {
            log.info("ItemMasterProcessor.afterChunkError");
        }


    }


    配置BEAN:

    @Bean
    @StepScope
    public ItemMasterChunkListener novaXItemMasterChunkListener() {
         return new ItemMasterChunkListener();
    }
        
    this.masterStepBuilderFactory
                        .<X, X>get("step2Handle")
                        .listener(itemMasterChunkListener())
                        .build();


    由于是在CHUNK完成的時(shí)候發(fā)出停止指令,就不會(huì)出現(xiàn)ROLLBACK的情況。

    等待遠(yuǎn)程STEP完成,通過(guò)讀取MQ上的MESSAGE是否被消費(fèi)完成,PENDDING的MESSAGE為0的條件即可。

    public class JobExecutionListenerSupport implements JobExecutionListener {

        /* (non-Javadoc)
         * @see org.springframework.batch.core.domain.JobListener#afterJob()
         
    */
        @Override
        public void afterJob(JobExecution jobExecution) {
            Integer totalPendingMessages = 0;
            String queueName = "";
            
            
            String messageSelector = "JOB_EXECUTION_ID=" + jobExecution.getJobInstance().getInstanceId();
            do{
                totalPendingMessages = 
                        this.jmsTemplate.browseSelected(queueName, messageSelector, 
                                    (session, browser) -> 
                                        Collections.list(browser.getEnumeration()).size()
                                );
                
                String brokerURL = null;
                if(jmsTemplate.getConnectionFactory() instanceof JmsPoolConnectionFactory) {
                    JmsPoolConnectionFactory connectionFactory =
                            (JmsPoolConnectionFactory)jmsTemplate.getConnectionFactory();
                    ActiveMQConnectionFactory activeMQConnectionFactory =
                            (ActiveMQConnectionFactory)connectionFactory.getConnectionFactory();
                    brokerURL = activeMQConnectionFactory.getBrokerURL();
                } else if(jmsTemplate.getConnectionFactory() instanceof CachingConnectionFactory) {
                    CachingConnectionFactory connectionFactory =
                            (CachingConnectionFactory)jmsTemplate.getConnectionFactory();
                    ActiveMQConnectionFactory activeMQConnectionFactory =
                            (ActiveMQConnectionFactory)connectionFactory.getTargetConnectionFactory();
                    brokerURL = activeMQConnectionFactory.getBrokerURL();
                }
                
                LOGGER.info("queueName = {}, {}, totalPendingMessages = {}, url={}", 
                        queueName, messageSelector, totalPendingMessages, brokerURL);
                Assert.notNull(totalPendingMessages, "totalPendingMessages must not be null.");
                try {
                    Thread.sleep(5_000);
                } catch (InterruptedException e) {
                    LOGGER.error(e.getMessage(), e);
                }
            } while(totalPendingMessages.intValue() > 0);
            
        }

        /* (non-Javadoc)
         * @see org.springframework.batch.core.domain.JobListener#beforeJob(org.springframework.batch.core.domain.JobExecution)
         
    */
        @Override
        public void beforeJob(JobExecution jobExecution) {
        }

    }


    這樣整個(gè)JOB就能無(wú)異常地停止,且會(huì)等待遠(yuǎn)程STEP完成。

    Reference:
    https://docs.spring.io/spring-batch/docs/4.1.3.RELEASE/reference/html/common-patterns.html#stoppingAJobManuallyForBusinessReasons

    https://stackoverflow.com/questions/13603949/count-number-of-messages-in-a-jms-queue

    https://stackoverflow.com/questions/55499965/spring-batch-stop-job-execution-from-external-class

    https://stackoverflow.com/questions/34621885/spring-batch-pollable-channel-with-replies-contains-chunkresponses-even-if-job


    posted on 2020-06-23 11:00 paulwong 閱讀(804) 評(píng)論(0)  編輯  收藏 所屬分類: SPRING BOOT

    主站蜘蛛池模板: 成人超污免费网站在线看| 亚洲第一页日韩专区| 亚洲免费一区二区| 亚洲视频欧洲视频| 美女视频黄a视频全免费网站色| 中文字幕久精品免费视频| 国内外成人免费视频| 亚洲精品无码AV人在线播放 | 亚洲精品蜜桃久久久久久| 国产色在线|亚洲| 色www永久免费网站| 成人免费视频国产| 老司机亚洲精品影院| 一级**爱片免费视频| 免费无码精品黄AV电影| 亚洲AV综合色区无码一区爱AV| 亚洲中文无码mv| 99久久国产免费中文无字幕| 免费一级毛片一级毛片aa| 亚洲国产精品一区二区久| A级毛片成人网站免费看| 免费黄色网址入口| 亚洲综合激情视频| 91免费国产视频| 四虎影视永久免费观看地址| 亚洲免费视频网址| 秋霞人成在线观看免费视频| 免费a级毛片在线观看| 亚洲无吗在线视频| 免费女人高潮流视频在线观看| 亚洲欧洲中文日韩av乱码| 亚洲一区二区三区高清在线观看| 99视频免费播放| 亚洲色欲色欲www在线丝| 男女猛烈xx00免费视频试看| 国产桃色在线成免费视频| 亚洲欧洲日产国产综合网| 中出五十路免费视频| 亚洲不卡AV影片在线播放| 亚洲AV色无码乱码在线观看| 在线视频观看免费视频18|