<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    paulwong

    SPRING INTEGRATION + SPRING BATCH

    SPRING INTEGRATION的強項是事件驅動,但捕獲之后,要做的事是觸發一個類的方法,對于要處理大數據量的文件,就沒有辦法了,如讀取1w條記錄,然后插入數據庫。而這個正是SPRING BATCH的強項所在,因此有必要將此兩個框架整合起來用。

    場景:盯著一個文件夾,如果一有文件,此文件可能非常大的,則啟動一個BATCH JOB來處理。


    文件拉取器,監控文件夾一有新文件,則將此文件包裝成MESSAGE,丟到下一個通道中:
    <file:inbound-channel-adapter id="filePoller"
                                  channel
    ="filesAreComing" 
                                  directory
    ="file:${input.directory}"
                                  filename-pattern
    ="test*" />


    filesAreComing通道的ServiceActivator
    public JobLaunchRequest adapt(File file) throws NoSuchJobException {

        JobParameters jobParameters = new JobParametersBuilder().addString(
                "input.file", file.getAbsolutePath()).toJobParameters();

        return new JobLaunchRequest(job, jobParameters);
    }


    jobLauncher通道的ServiceActivator
    <service-activator input-channel="jobLauncher">
        <beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
            <beans:constructor-arg ref="jobLauncher" />
        </beans:bean>
    </service-activator>


    "file.input"依賴于執行期的取值
    <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="resource" value="#{jobParameters[input.file]}" />
         line mapper and other props
    </bean>


    參考的SPRING XML
    <?xml version="1.0" encoding="UTF-8"?>
    <beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:beans
    ="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:file
    ="http://www.springframework.org/schema/integration/file"
        xsi:schemaLocation
    ="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"
    >

        <channel id="fileChannel"/>
        <channel id="jobLaunchRequestChannel"/>
        <channel id="jobExecutionChannel"/>
        <logging-channel-adapter channel="jobExecutionChannel" />

        <file:inbound-channel-adapter directory="/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file"
            channel
    ="fileChannel" filename-pattern="t*.xml" comparator="fileCreationTimeComparator">
            <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
        </file:inbound-channel-adapter>
        
        <service-activator input-channel="fileChannel"
                           output-channel
    ="jobLaunchRequestChannel"
                           ref
    ="fileToJobLaunchRequestAdapter"
                           method
    ="adapt"/>
                           
        <service-activator input-channel="jobLaunchRequestChannel" output-channel="jobExecutionChannel">
            <beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
                <beans:constructor-arg ref="jobLauncher" />
            </beans:bean>
        </service-activator>

        <beans:bean id="fileToJobLaunchRequestAdapter" class="example.FileToJobLaunchRequestAdapter">
            <beans:property name="job" ref="helloWorldJob"/>
        </beans:bean>
        
        
        <beans:bean id="fileCreationTimeComparator" class="com.paul.integration.file.filters.comparator.FileCreationTimeComparator">
        </beans:bean>

    </beans:beans>

    SPRING BATCH JOB的配置文件
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi
    ="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:batch
    ="http://www.springframework.org/schema/batch"
        xmlns:util
    ="http://www.springframework.org/schema/util"
        xsi:schemaLocation
    ="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
            http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd"
    >

        

        <batch:job id="helloWorldJob">
            <batch:step id="step1" next="xmlFileReadAndWriterStep">
                <batch:tasklet ref="helloWorldTasklet"></batch:tasklet>
            </batch:step>
            <batch:step id="xmlFileReadAndWriterStep">
                 <batch:tasklet>
                     <batch:chunk reader="xmlReader" writer="xmlWriter" processor="xmlProcessor"
                         commit-interval
    ="10">
                     </batch:chunk>
                 </batch:tasklet>
             </batch:step>
        </batch:job>
        
        <bean id="helloWorldTasklet" class="example.HelloWorldTasklet"></bean>
        
        <!-- XML文件讀取 -->
         <bean id="xmlReader"
             class
    ="org.springframework.batch.item.xml.StaxEventItemReader" scope="step">
             <property name="fragmentRootElementName" value="trade" />
             <property name="unmarshaller" ref="tradeMarshaller" />
             <property name="resource" value="#{jobParameters['input.file.path']}" />
         </bean>
     
         <bean id="xmlProcessor" class="com.paul.batch.XMLProcessor" />
     
         <!-- XML文件寫入 -->
        <bean id="xmlWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter"
            scope
    ="step">
            <property name="rootTagName" value="wanggc" />
            <property name="marshaller" ref="tradeMarshaller" />
            <property name="resource" value="#{jobParameters['output.file.path']}" />
        </bean>

        <bean id="tradeMarshaller" class="org.springframework.oxm.xstream.XStreamMarshaller">
            <property name="aliases">
                <util:map id="aliases">
                    <entry key="trade" value="com.paul.domain.Trade" />
                    <entry key="price" value="java.math.BigDecimal" />
                    <entry key="name" value="java.lang.String" />
                </util:map>
            </property>
        </bean>
        
    </beans>


    文件處理器
    package com.paul.batch;

    import org.springframework.batch.item.ItemProcessor;

    import com.paul.domain.Trade;

    public class XMLProcessor implements ItemProcessor<Trade, Trade> {

        /**
         * XML文件內容處理。
         * 
         
    */
        @Override
        public Trade process(Trade trade) throws Exception {
            return trade;
        }
    }


    DOMAIN TRADE對象
    /*
     * Copyright 2006-2007 the original author or authors.
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      
    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     
    */

    package com.paul.domain;

    import java.io.Serializable;
    import java.math.BigDecimal;


    /**
     * 
    @author Rob Harrop
     * 
    @author Dave Syer
     
    */
    public class Trade implements Serializable {
        private String isin = "";
        private long quantity = 0;
        private BigDecimal price = new BigDecimal(0);
        private String customer = "";
        private Long id;
        private long version = 0;

        public Trade() {
        }
        
        public Trade(String isin, long quantity, BigDecimal price, String customer){
            this.isin = isin;
            this.quantity = quantity;
            this.price = price;
            this.customer = customer;
        }

        /**
         * 
    @param id
         
    */
        public Trade(long id) {
            this.id = id;
        }
        
        public long getId() {
            return id;
        }
        
        public void setId(long id) {
            this.id = id;
        }

        public long getVersion() {
            return version;
        }

        public void setVersion(long version) {
            this.version = version;
        }

        public void setCustomer(String customer) {
            this.customer = customer;
        }

        public void setIsin(String isin) {
            this.isin = isin;
        }

        public void setPrice(BigDecimal price) {
            this.price = price;
        }

        public void setQuantity(long quantity) {
            this.quantity = quantity;
        }

        public String getIsin() {
            return isin;
        }

        public BigDecimal getPrice() {
            return price;
        }

        public long getQuantity() {
            return quantity;
        }

        public String getCustomer() {
            return customer;
        }

        public String toString() {
            return "Trade: [isin=" + this.isin + ",quantity=" + this.quantity + ",price="
                + this.price + ",customer=" + this.customer + "]";
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((customer == null) ? 0 : customer.hashCode());
            result = prime * result + ((isin == null) ? 0 : isin.hashCode());
            result = prime * result + ((price == null) ? 0 : price.hashCode());
            result = prime * result + (int) (quantity ^ (quantity >>> 32));
            result = prime * result + (int) (version ^ (version >>> 32));
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Trade other = (Trade) obj;
            if (customer == null) {
                if (other.customer != null)
                    return false;
            }
            else if (!customer.equals(other.customer))
                return false;
            if (isin == null) {
                if (other.isin != null)
                    return false;
            }
            else if (!isin.equals(other.isin))
                return false;
            if (price == null) {
                if (other.price != null)
                    return false;
            }
            else if (!price.equals(other.price))
                return false;
            if (quantity != other.quantity)
                return false;
            if (version != other.version)
                return false;
            return true;
        }
        
     }


    從文件夾取出文件列表后,進行按修改時間排序的排序器
    package com.paul.integration.file.filters.comparator;

    import java.io.File;
    import java.util.Comparator;

    public class FileCreationTimeComparator implements Comparator<File>{

        @Override
        public int compare(File file1, File file2) {
            return Long.valueOf(file2.lastModified()).compareTo(
                    Long.valueOf(file1.lastModified()));
    //        return file1.getName().compareToIgnoreCase(file2.getName());
        }
        
    }


    封裝了JOB和JOBPARAMETERS的HOLDER類

    package example;

    import java.io.File;

    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.launch.NoSuchJobException;
    import org.springframework.batch.integration.launch.JobLaunchRequest;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.integration.annotation.MessageEndpoint;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.util.Assert;

    /**
     * Adapt a {
    @link File} to a {@link JobLaunchRequest} with a job parameter
     * <code>input.file</code> equal to the path of the file.
     * 
     * 
    @author Dave Syer
     * 
     
    */
    @MessageEndpoint
    public class FileToJobLaunchRequestAdapter implements InitializingBean {

        private Job job;

        public void setJob(Job job) {
            this.job = job;
        }

        public void afterPropertiesSet() throws Exception {
            Assert.notNull(job, "A Job must be provided");
        }

        @ServiceActivator
        public JobLaunchRequest adapt(File file) throws NoSuchJobException {

            String fileName = file.getAbsolutePath();

            if (!fileName.startsWith("/")) {
                fileName = "/" + fileName;
            }

            fileName = "file://" + fileName;
            
            String outPutFilePath = "file:/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/" +
                    "spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file/output/out.xml";

            JobParameters jobParameters = new JobParametersBuilder().
                    addString("input.file.path", fileName).
                    addString("output.file.path", outPutFilePath).
                    addLong("time.stamp", System.currentTimeMillis()).
                    toJobParameters();

            if (job.getJobParametersIncrementer() != null) {
                jobParameters = job.getJobParametersIncrementer().getNext(jobParameters);
            }

            return new JobLaunchRequest(job, jobParameters);

        }

    }


    TRADE測試數據文件
    <?xml version="1.0" encoding="UTF-8"?>
    <records>
        <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
            <isin>XYZ0001</isin>
            <quantity>5</quantity>
            <price>11.39</price>
            <customer>Customer1</customer>
        </trade>
        <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
            <isin>XYZ0002</isin>
            <quantity>2</quantity>
            <price>72.99</price>
            <customer>Customer2c</customer>
        </trade>
        <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
            <isin>XYZ0003</isin>
            <quantity>9</quantity>
            <price>99.99</price>
            <customer>Customer3</customer>
        </trade>
    </records>

    MAVEN中的JAR DEPENDENCY:
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
        <version>2.0.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-integration</artifactId>
        <version>1.2.1.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-file</artifactId>
        <version>2.0.3.RELEASE</version>
    </dependency>

    至此全部通道已打通。


    參考:
    http://stackoverflow.com/questions/7099543/launching-spring-batch-job

    http://stackoverflow.com/questions/11758672/spring-batch-flatfileitemreader-read-multiple-files

    https://github.com/SpringSource/spring-batch-admin/blob/master/spring-batch-admin-manager/src/main/java/org/springframework/batch/admin/integration/FileToJobLaunchRequestAdapter.java

    http://blog.springsource.org/2010/02/15/practical-use-of-spring-batch-and-spring-integration/

    http://www.enterpriseintegrationpatterns.com/ramblings/18_starbucks.html

    http://static.springsource.org/spring-integration/docs/2.0.3.RELEASE/reference/html/jdbc.html

    posted on 2012-10-16 00:11 paulwong 閱讀(5474) 評論(7)  編輯  收藏 所屬分類: SPRING INTERGRATIONSRPING BATCH

    Feedback

    # re: SPRING INTEGRATION + SPRING BATCH 2012-11-16 16:17 RoJeff

    你好:現在有個問題請求幫助。
    需求:數據源來自多個目錄下的文件,而且文件的格式不一樣。該怎么解決?跪求結果,謝謝了

    現在讀取多個目錄下的文件的問題已經解決了,我是通過配置多個輸入channel,一個輸出 channel。就是對多個目錄進行監控。我是沒有辦法,才這樣。因為我需要監控多個目錄。

    還個難題就是多個文件進來了,格式不一樣,怎么樣去調用不同的分割器,分割文件。或者說調用不到的讀取器

    以下是我的applicationContext-integration.xml配置

    <channel id="videoPlayerFileChannel" />
    <channel id="videoPlayerJobLaunchRequestChannel" />
    <channel id="videoPlayerJobExecutionChannel" />
    <logging-channel-adapter channel="videoPlayerJobExecutionChannel" />

    <file:inbound-channel-adapter id="videoPlayerInboundChannelAdapter"
    directory="${config.1000000022.SourcePath}" auto-create-directory="true"
    auto-startup="true" channel="videoPlayerFileChannel" filename-pattern="${config.1000000022.SourceType}"
    comparator="fileComparator">
    <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
    </file:inbound-channel-adapter>

    <service-activator input-channel="videoPlayerFileChannel"
    output-channel="videoPlayerJobLaunchRequestChannel" ref="launcher"
    method="adapt" />

    <service-activator input-channel="videoPlayerJobLaunchRequestChannel"
    output-channel="videoPlayerJobExecutionChannel">
    <beans:bean
    class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
    <beans:constructor-arg ref="jobLauncher" />
    </beans:bean>
    </service-activator>

    <beans:bean id="launcher" class="com.zjcy.cbs.pretreatment.batch.Launcher">
    <beans:property name="job" ref="pretreatJob" />
    </beans:bean>


    <beans:bean id="fileComparator"
    class="com.zjcy.cbs.pretreatment.batch.FileComparator">
    </beans:bean>


    <channel id="fileChannelMessage" />
    <channel id="messageJobLaunchRequestChannel" />
    <channel id="messageJobExecutionChannel" />
    <logging-channel-adapter channel="messageJobExecutionChannel" />


    <file:inbound-channel-adapter id="fileChannelMessage1"
    directory="${config.2000000001.SourcePath}" auto-create-directory="true" auto-startup="true"
    channel="fileChannelMessage" filename-pattern="*${config.2000000001.SourceType}" comparator="fileComparator">
    <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
    </file:inbound-channel-adapter>

    <service-activator input-channel="fileChannelMessage"
    output-channel="videoPlayerJobLaunchRequestChannel" ref="launcher"
    method="adapt" />  回復  更多評論   

    # re: SPRING INTEGRATION + SPRING BATCH 2012-11-17 22:39 paulwong

    @RoJeff
    可以在videoPlayerFileChannel中取得文件后綴名,加一個選擇判斷器之類的東西,按后綴名分派到不同的CHANNEL中。  回復  更多評論   

    # re: SPRING INTEGRATION + SPRING BATCH[未登錄] 2012-11-18 09:44 RoJeff

    @paulwong
    非常謝謝paulwong的幫助。
      回復  更多評論   

    # re: SPRING INTEGRATION + SPRING BATCH 2012-11-18 09:44 RoJeff

    @paulwong
    非常謝謝paulwong的幫助。  回復  更多評論   

    # re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 17:25 RoJeff

    @paulwong
    請教問題:
    需求:file:inbound-channel-adapter 能配置多個目錄嗎,或者說可以包括子目錄嗎?如果可以,該怎么配置?

    <file:inbound-channel-adapter directory="/Users/paul/Documents/PAUL/DOWNLOAD/SOFTWARE/DEVELOP/SPRING BATCH/spring-batch-2.1.9.RELEASE/samples/spring-batch-simple-cli/file"
    channel="fileChannel" filename-pattern="t*.xml" comparator="fileCreationTimeComparator">
    <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
    </file:inbound-channel-adapter>  回復  更多評論   

    # re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 22:17 paulwong

    @RoJeff
    <bean name="scanner" class="org.springframework.integration.file.RecursiveLeafOnlyDirectoryScanner"/>

    <file:inbound-channel-adapter
    id="inputChannel" scanner="scanner" directory="/some/folder">  回復  更多評論   

    # re: SPRING INTEGRATION + SPRING BATCH 2012-11-19 22:47 RoJeff

    @paulwong
    大牛就是大牛啊,這個問題得到解決,非常感覺。  回復  更多評論   


    主站蜘蛛池模板: 国产免费一区二区三区免费视频| 亚洲av成人一区二区三区| 最近免费中文字幕大全| 成人黄网站片免费视频| 亚洲精品av无码喷奶水糖心| 日本人护士免费xxxx视频| 每天更新的免费av片在线观看| 中文字幕无码免费久久9一区9 | 国产成人久久精品亚洲小说| 亚洲国产精品人人做人人爽| 免费精品国产自产拍在| 无码毛片一区二区三区视频免费播放 | 国产成人免费A在线视频| 一级特黄色毛片免费看| 亚洲国产综合专区在线电影| 中文字幕无码成人免费视频| 一级毛片免费观看不卡的| 香蕉免费在线视频| 一级毛片视频免费| 人人爽人人爽人人片A免费| 亚洲αⅴ无码乱码在线观看性色 | 一区二区三区观看免费中文视频在线播放 | 亚洲av福利无码无一区二区| 麻豆国产精品入口免费观看| 成人av片无码免费天天看| 免费一级做a爰片久久毛片潮| 极品色天使在线婷婷天堂亚洲 | 九九久久精品国产免费看小说| 亚洲乱亚洲乱妇无码| 亚洲精品免费视频| 亚洲AV无码精品色午夜在线观看| 免费一级做a爰片性色毛片| 色吊丝最新永久免费观看网站| 成年女人毛片免费播放视频m| 一二三四在线观看免费高清中文在线观看 | 亚洲噜噜噜噜噜影院在线播放 | 免费无码又爽又刺激一高潮| 野花香高清视频在线观看免费 | 毛色毛片免费观看| 一级毛片**不卡免费播| 久久免费精彩视频|