skip first line of a csv using camel EIP spliter process all other lines and aggregate all lines including the skipped line

有没有一种简单的方法可以跳过 csv 的第一行,header,使用 camel EIP 拆分器处理所有其他行并聚合所有行,包括跳过的行? 我需要转换 CSV 文件中每条记录的日期,但跳过第一行 header。我正在尝试使用骆驼 EIP 拆分器。 谢谢!

    autoStartup="true" >
    <from id="predixConsumer" ref="predixConsumer" />   
    <convertBodyTo type="java.lang.String" />
    <split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
        <tokenize token="\n"/> 
        <log message="Split line ${body}"/>
            <!--  <process ref="EnrichementProcessor"/> -->
    <to uri="{{fileDestinationEndpoint}}" />

这行不通。我认为这会起作用,因为如果 property.CamelSlitIndex > 0,它应该只执行语句中的内容,但它会为每一行执行,即使 CamelSplitIndex =- 0。


    autoStartup="true" >
    <from id="preConsumer" ref="preConsumer" /> 
    <convertBodyTo type="java.lang.String" />
    <split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
        <tokenize token="\n"/> 
        <log message="Split line ${body}"/>
            <simple>"${property.CamelSplitIndex} > 0"</simple>
            <process ref="timeStampEnrichmentProcessor" />
            <log message="Camel Split Index is greater than zero: ${property.CamelSplitIndex}" />
    <log message="body: ${body}" />
    <to uri="{{fileDestinationEndpoint}}" />



    <from uri="seda:postFlightReportProducer" />
     <split streaming="true" strategyRef="statusAggregationStrategy">
        <log message="inbound Post Flight Summary Report message body: ${body}" />
        <process ref="postFlightReportMarshaler" />
        <to uri="velocity:templates/postFlightReportSummary.vm" />
        <log message="Velocity output: ${body}" loggingLevel="INFO" /> 
 <wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.csv" /> 
            <simple>${body} == null</simple>
            <log message="body is NULL, do not send NULL body!" />
            <process ref="xlsxProcessor"/>
            <wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
            <log message="Sending data packet: ${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />

public class StatusAggregationStrategy implements AggregationStrategy {

    private Logger log = LoggerFactory.getLogger(StatusAggregationStrategy.class.getName());

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        // Arrived    | oldExchange  |  newExchange | Description
        // A         | NULL         |  A            | first message arrives for the first group
        // B         | A            |  B            | second message arrives for the first group
        // F         | NULL         |  F            | first message arrives for the second group
        // C         | AB           |  C            | third message arrives for the first group
        log.info("Status Aggregation Strategy :: start");

            if ( oldExchange == null ) {  //This will set the 1st record with the Header
            log.info("old Exchange is Null");
            String body = newExchange.getIn().getBody(String.class);
            return newExchange;

    //Each newBody msg exchange will have 2 records, a header and a data record
    String newBody = newExchange.getIn().getBody(String.class);
    String existingBody = oldExchange.getIn().getBody(String.class);
    StringBuilder osb = new StringBuilder();

    log.info("New Body exchange: " + newBody);
    log.info("Old Body exchange: " + existingBody);

    String SkipRecord = "";
    String addRecord = "";
    Scanner osc = new Scanner(newBody).useDelimiter("\r\n|\n");
    while ( osc.hasNextLine() ) {
     SkipRecord = osc.nextLine();
     //osc.nextLine();  //move past header
     log.info("aggregation: skip record: " + SkipRecord);
     if (  osc.hasNextLine() ) { 
       addRecord = osc.nextLine();
       log.info("aggregation addRecord: " + addRecord );
     } else { log.error("bad newBody message exchange, has no data record!"); }

    log.info("Joined exchange: Old body: " + existingBody + " New body: " + osb.toString());
    oldExchange.getIn().setBody(existingBody + osb.toString() );

    log.debug("Status Aggregation Strategy :: finish");

    return oldExchange;

    } //Exchange process

您可以使用选项 org.apache.camel.dataformat.csv.CsvDataFormat#setSkipHeaderRecord(true)


package app;

import io.github.dgroup.j2gp.processor.GroupByParent;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.csv.CsvDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.csv.CSVFormat;

import java.util.List;

public class App {
    public static void main(String[] args) throws Exception {
        CamelContext ctx = new DefaultCamelContext();
        ctx.addRoutes(new RouteBuilder() {
            public void configure() {

                CsvDataFormat csv = new CsvDataFormat();
                csv.setSkipHeaderRecord(true); // <- ignore first line (e.g. csv header)

                    .log("Reached ${body}")
                    .process(exchange -> {
                        List<String> line = exchange.getIn().getBody(List.class);

        // start the route and let it do its work

        // stop the CamelContext