使用骆驼 EIP 拆分器跳过 csv 的第一行处理所有其他行并聚合所有行,包括跳过的行

skip first line of a csv using camel EIP spliter process all other lines and aggregate all lines including the skipped line

有没有一种简单的方法可以跳过 csv 的第一行,header,使用 camel EIP 拆分器处理所有其他行并聚合所有行,包括跳过的行? 我需要转换 CSV 文件中每条记录的日期,但跳过第一行 header。我正在尝试使用骆驼 EIP 拆分器。 谢谢!

<route 
    id="core.predix.consumer.route"
    autoStartup="true" >
    <from id="predixConsumer" ref="predixConsumer" />   
    <convertBodyTo type="java.lang.String" />
    <split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
        <tokenize token="\n"/> 
        <log message="Split line ${body}"/>
            <!--  <process ref="EnrichementProcessor"/> -->
    </split>
    <to uri="{{fileDestinationEndpoint}}" />
</route>

这行不通。我认为这会起作用,因为如果 property.CamelSlitIndex > 0,它应该只执行语句中的内容,但它会为每一行执行,即使 CamelSplitIndex =- 0。

这是bug还是我用错了???

<route 
    id="core.pre.consumer.route"
    autoStartup="true" >
    <from id="preConsumer" ref="preConsumer" /> 
    <convertBodyTo type="java.lang.String" />
    <split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
        <tokenize token="\n"/> 
        <log message="Split line ${body}"/>
        <choice>
          <when>
            <simple>"${property.CamelSplitIndex} > 0"</simple>
            <process ref="timeStampEnrichmentProcessor" />
            <log message="Camel Split Index is greater than zero: ${property.CamelSplitIndex}" />
          </when>
        </choice>           
    </split>
    <log message="body: ${body}" />
    <to uri="{{fileDestinationEndpoint}}" />
</route>

有点晚了但是...

将逻辑放入聚合器中...

 <route
    id="fadec.core.PostFlightReportProducerRoute"
    autoStartup="true">
    <from uri="seda:postFlightReportProducer" />
     <split streaming="true" strategyRef="statusAggregationStrategy">
     <simple>${body}</simple>
        <log message="inbound Post Flight Summary Report message body: ${body}" />
        <process ref="postFlightReportMarshaler" />
        <to uri="velocity:templates/postFlightReportSummary.vm" />
        <log message="Velocity output: ${body}" loggingLevel="INFO" /> 
     </split>
 <wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.csv" /> 
     <choice>
        <when>
            <simple>${body} == null</simple>
            <log message="body is NULL, do not send NULL body!" />
            <stop></stop>
        </when>
        <otherwise>
            <process ref="xlsxProcessor"/>
            <wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
            <log message="Sending data packet: ${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
            <stop></stop>             
        </otherwise>
    </choice>
 </route>

public class StatusAggregationStrategy implements AggregationStrategy {

    private Logger log = LoggerFactory.getLogger(StatusAggregationStrategy.class.getName());

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        //-------------------------------------------------------------------------------------
        // Arrived    | oldExchange  |  newExchange | Description
        //-------------------------------------------------------------------------------------
        // A         | NULL         |  A            | first message arrives for the first group
        // B         | A            |  B            | second message arrives for the first group
        // F         | NULL         |  F            | first message arrives for the second group
        // C         | AB           |  C            | third message arrives for the first group
        //---------------------------------------------------------------------------------------
        log.info("Status Aggregation Strategy :: start");

            if ( oldExchange == null ) {  //This will set the 1st record with the Header
            log.info("old Exchange is Null");
            String body = newExchange.getIn().getBody(String.class);
            newExchange.getIn().setBody(body);
            return newExchange;
        }

    //Each newBody msg exchange will have 2 records, a header and a data record
    String newBody = newExchange.getIn().getBody(String.class);
    String existingBody = oldExchange.getIn().getBody(String.class);
    StringBuilder osb = new StringBuilder();

    log.info("New Body exchange: " + newBody);
    log.info("Old Body exchange: " + existingBody);

    String SkipRecord = "";
    String addRecord = "";
    Scanner osc = new Scanner(newBody).useDelimiter("\r\n|\n");
    while ( osc.hasNextLine() ) {
     SkipRecord = osc.nextLine();
     //osc.nextLine();  //move past header
     log.info("aggregation: skip record: " + SkipRecord);
     if (  osc.hasNextLine() ) { 
       addRecord = osc.nextLine();
       log.info("aggregation addRecord: " + addRecord );
       osb.append(addRecord).append(System.lineSeparator());
     } else { log.error("bad newBody message exchange, has no data record!"); }
    }
    osc.close();

    log.info("Joined exchange: Old body: " + existingBody + " New body: " + osb.toString());
    oldExchange.getIn().setBody(existingBody + osb.toString() );

    log.debug("Status Aggregation Strategy :: finish");

    return oldExchange;

    } //Exchange process

您可以使用选项 org.apache.camel.dataformat.csv.CsvDataFormat#setSkipHeaderRecord(true)

工作原理如下:

package app;

import io.github.dgroup.j2gp.processor.GroupByParent;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.csv.CsvDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.csv.CSVFormat;

import java.util.List;


public class App {
    public static void main(String[] args) throws Exception {
        CamelContext ctx = new DefaultCamelContext();
        ctx.addRoutes(new RouteBuilder() {
            public void configure() {

                CsvDataFormat csv = new CsvDataFormat();
                csv.setFormat(CSVFormat.EXCEL);
                csv.setSkipHeaderRecord(true); // <- ignore first line (e.g. csv header)

                from("file:data/inbox?noop=true")
                    .unmarshal(csv)
                    .split(body())
                    .log("Reached ${body}")
                    .process(exchange -> {
                        List<String> line = exchange.getIn().getBody(List.class);
                        ...
                    });
                    ...
            }
        });

        // start the route and let it do its work
        ctx.start();
        Thread.sleep(10000);

        // stop the CamelContext
        ctx.stop();
    }
}