<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    Transform RemoteChunk to remote with json format in Spring Batch

    Spring Batch Remote Chunk模式下,遠程執行JOB時,傳輸的對象是ChunkRequest/ChunkResponse,無法轉成JSON格式傳輸。

    注意此處使用的是SPRING JACKSON,而不是JACKSON。一般是在SPRING INTEGRATIONA框架下轉的。

    需要自定義Transformer:

    JsonToChunkRequestTransformer.java
    package com.frandorado.springbatchawsintegrationslave.transformer;

    import java.io.IOException;
    import java.util.Collection;
    import java.util.Map;
    import java.util.stream.IntStream;

    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.integration.chunk.ChunkRequest;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.integration.aws.support.AwsHeaders;
    import org.springframework.integration.json.JsonToObjectTransformer;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;

    import com.amazonaws.services.sqs.AmazonSQSAsync;
    import com.fasterxml.jackson.databind.ObjectMapper;

    @Component
    public class JsonToChunkRequestTransformer extends JsonToObjectTransformer {
        
        private static final String MESSAGE_GROUP_ID_HEADER = "message-group-id";
        
        @Autowired
        AmazonSQSAsync amazonSQSAsync;
        
        @Override
        protected Object doTransform(Message<?> message) throws Exception {
            // ACK
            ack(message);
            
            return this.getMessageBuilderFactory().withPayload(buildChunkRequest(message)).setHeader(MESSAGE_GROUP_ID_HEADER, "unique").build();
        }
        
        private ChunkRequest buildChunkRequest(Message<?> message) throws IOException {
            Map map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
            Map stepContributionMap = (Map) map.get("stepContribution");
            Map exitStatusMap = (Map) stepContributionMap.get("exitStatus");
            
            StepContribution stepContribution = new StepContribution(new StepExecution("null", null));
            ExitStatus exitStatus = new ExitStatus((String) exitStatusMap.get("exitCode"), (String) exitStatusMap.get("exitDescription"));
            
            IntStream.range(0, (Integer) stepContributionMap.get("readCount")).forEach(e -> stepContribution.incrementReadCount());
            stepContribution.incrementWriteCount((Integer) stepContributionMap.get("writeCount"));
            stepContribution.incrementFilterCount((Integer) stepContributionMap.get("filterCount"));
            stepContribution.incrementReadSkipCount((Integer) stepContributionMap.get("readSkipCount"));
            IntStream.range(0, (Integer) stepContributionMap.get("writeSkipCount")).forEach(e -> stepContribution.incrementWriteSkipCount());
            IntStream.range(0, (Integer) stepContributionMap.get("processSkipCount"))
                    .forEach(e -> stepContribution.incrementProcessSkipCount());
            stepContribution.setExitStatus(exitStatus);
            
            return new ChunkRequest((Integer) map.get("sequence"), (Collection) map.get("items"), (Integer) map.get("jobId"), stepContribution);
        }
        
        private void ack(Message<?> message) {
            String receiptHandle = message.getHeaders().get(AwsHeaders.RECEIPT_HANDLE, String.class);
            String queue = message.getHeaders().get(AwsHeaders.QUEUE, String.class);
            String queueUrl = amazonSQSAsync.getQueueUrl(queue).getQueueUrl();
            
            amazonSQSAsync.deleteMessage(queueUrl, receiptHandle);
        }
    }


    JsonToChunkResponseTransformer.java
    package com.frandorado.springbatchawsintegrationmaster.transformer;

    import java.io.IOException;
    import java.util.Map;

    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.integration.chunk.ChunkResponse;
    import org.springframework.integration.json.JsonToObjectTransformer;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Component;

    import com.fasterxml.jackson.databind.ObjectMapper;

    @Component
    public class JsonToChunkResponseTransformer extends JsonToObjectTransformer {
        
        @Override
        protected Object doTransform(Message<?> message) throws Exception {
            return buildChunkResponse(message);
        }
        
        private ChunkResponse buildChunkResponse(Message<?> message) throws IOException {
            Map map = new ObjectMapper().readValue(message.getPayload().toString(), Map.class);
            
            Integer jobId = (Integer) map.get("jobId");
            Integer sequence = (Integer) map.get("sequence");
            String messageContent = (String) map.get("message");
            Boolean status = (Boolean) map.get("successful");
            
            StepContribution stepContribution = new StepContribution(new StepExecution("-", null));
            
            return new ChunkResponse(status, sequence, Long.valueOf(jobId), stepContribution, messageContent);
        }
    }


    還有一種方式,就是如果第三類不支持轉JSON,即代碼里沒有JACKSON的注解,可以采用MIXIN的方式:

    StepExecutionJacksonMixIn.java
    package org.springframework.cloud.dataflow.rest.client.support;

    import com.fasterxml.jackson.annotation.JsonCreator;
    import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
    import com.fasterxml.jackson.annotation.JsonProperty;

    import org.springframework.batch.core.StepExecution;

    /**
     * Jackson MixIn for {
    @link StepExecution} de-serialization.
     *
     * 
    @author Gunnar Hillert
     * 
    @since 1.0
     
    */
    @JsonIgnoreProperties({ "jobExecution", "jobParameters", "jobExecutionId", "skipCount", "summary" })
    public abstract class StepExecutionJacksonMixIn {

        @JsonCreator
        StepExecutionJacksonMixIn(@JsonProperty("stepName") String stepName) {
        }

    }

    在配置文件中注冊才能使用:
    JacksonConfiguration.java
    import java.util.Locale;
    import java.util.TimeZone;

    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobInstance;
    import org.springframework.batch.core.JobParameter;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.item.ExecutionContext;
    import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.support.json.Jackson2JsonObjectMapper;

    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
    import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
    import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.ISO8601DateFormatWithMilliSeconds;
    import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.ExecutionContextJacksonMixIn;
    import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.ExitStatusJacksonMixIn;
    import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobExecutionJacksonMixIn;
    import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobInstanceJacksonMixIn;
    import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobParameterJacksonMixIn;
    import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.JobParametersJacksonMixIn;
    import com.novacredit.bmb.batchmonitor.springbatch.common.batch.jackson.mixin.StepExecutionJacksonMixIn;

    @Configuration
    public class JacksonConfiguration {

        @Bean
        public Jackson2JsonObjectMapper jackson2JsonObjectMapper(ObjectMapper objectMapper) {
            return new Jackson2JsonObjectMapper(objectMapper);
        }
        
        @Bean
        public Jackson2ObjectMapperBuilderCustomizer dataflowObjectMapperBuilderCustomizer() {
            return (builder) -> {
                builder.dateFormat(new ISO8601DateFormatWithMilliSeconds(TimeZone.getDefault(), Locale.getDefault(), true));
                // apply SCDF Batch Mixins to
                
    // ignore the JobExecution in StepExecution to prevent infinite loop.
                
    // https://github.com/spring-projects/spring-hateoas/issues/333
                builder.mixIn(StepExecution.class, StepExecutionJacksonMixIn.class);
                builder.mixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
                builder.mixIn(JobExecution.class, JobExecutionJacksonMixIn.class);
                builder.mixIn(JobParameters.class, JobParametersJacksonMixIn.class);
                builder.mixIn(JobParameter.class, JobParameterJacksonMixIn.class);
                builder.mixIn(JobInstance.class, JobInstanceJacksonMixIn.class);
    //            builder.mixIn(StepExecutionHistory.class, StepExecutionHistoryJacksonMixIn.class);
                builder.mixIn(ExecutionContext.class, ExecutionContextJacksonMixIn.class);
                builder.mixIn(ExitStatus.class, ExitStatusJacksonMixIn.class);
    //            objectMapper.setDateFormat(new ISO8601DateFormatWithMilliSeconds());
                builder.modules(new JavaTimeModule(), new Jdk8Module());
            };
        }
    }

        @Bean
        public IntegrationFlow flow4Contribution(
                ConnectionFactory connectionFactory, 
                JobProperties jobProperties,
                Jackson2JsonObjectMapper jackson2JsonObjectMapper
        ) {
            return IntegrationFlows
                        .from(request4ContributionMaster())
                        .enrichHeaders(headerEnricherConfigurer())
                        .transform(Transformers.toJson(jackson2JsonObjectMapper))
                        .handle(jmsOutboundGateway4Contribution(connectionFactory, jobProperties))
                        .transform(Transformers.fromJson(StepExecution.class, jackson2JsonObjectMapper))
                        .channel(replies4ContributionMaster(null))
                        .get();
        }


    https://github.com/spring-cloud/spring-cloud-dataflow/tree/master/spring-cloud-dataflow-rest-client/src/main/java/org/springframework/cloud/dataflow/rest/client/support

    https://frandorado.github.io/spring/2019/07/29/spring-batch-aws-series-introduction.html

    https://github.com/frandorado/spring-projects/tree/master/spring-batch-aws-integration/spring-batch-aws-integration-master/src/main/java/com/frandorado/springbatchawsintegrationmaster/transformer


    https://github.com/frandorado/spring-projects/tree/master/spring-batch-aws-integration/spring-batch-aws-integration-slave/src/main/java/com/frandorado/springbatchawsintegrationslave/transformer

    posted on 2020-01-21 16:44 paulwong 閱讀(581) 評論(0)  編輯  收藏 所屬分類: SRPING BATCH

    主站蜘蛛池模板: 成人毛片18女人毛片免费视频未| 女人18毛片水最多免费观看| 久久精品国产亚洲AV麻豆~| 亚洲一区免费观看| 亚洲人成网站看在线播放| 日本午夜免费福利视频| selaoban在线视频免费精品| 亚洲男人都懂得羞羞网站| 无人在线观看免费高清视频| 免费激情网站国产高清第一页| 久久青草亚洲AV无码麻豆| 免费在线观看a级毛片| 午夜影院免费观看| 久久成人永久免费播放| 色老板亚洲视频免在线观| 亚洲五月激情综合图片区| 亚洲自偷自偷图片| 日本在线高清免费爱做网站| 特级毛片全部免费播放a一级| 亚洲一区二区成人| 免费A级毛片无码A| 在线观看人成视频免费| 在线v片免费观看视频| 国产成人精品免费午夜app| 四虎国产精品成人免费久久| 亚洲一级毛片中文字幕| 亚洲中文字幕无码爆乳av中文| 13一14周岁毛片免费| 一区二区三区视频免费观看| 亚洲乱码一二三四区麻豆| 亚洲中文无韩国r级电影| 亚洲国产精品成人AV无码久久综合影院 | 亚洲国产精品无码久久一线| 国产精品成人免费视频网站京东| 精品一区二区三区免费视频| 又黄又大的激情视频在线观看免费视频社区在线 | 国产成人精品免费视频大全五级 | 亚洲国产精华液网站w| 国产亚洲精午夜久久久久久| 中文字幕成人免费视频| 国产精品永久免费|