使用骆驼 EIP 拆分器跳过 csv 的第一行处理所有其他行并聚合所有行,包括跳过的行
skip first line of a csv using camel EIP spliter process all other lines and aggregate all lines including the skipped line
有没有一种简单的方法可以跳过 csv 的第一行,header,使用 camel EIP 拆分器处理所有其他行并聚合所有行,包括跳过的行?
我需要转换 CSV 文件中每条记录的日期,但跳过第一行 header。我正在尝试使用骆驼 EIP 拆分器。
谢谢!
<route
id="core.predix.consumer.route"
autoStartup="true" >
<from id="predixConsumer" ref="predixConsumer" />
<convertBodyTo type="java.lang.String" />
<split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
<tokenize token="\n"/>
<log message="Split line ${body}"/>
<!-- <process ref="EnrichementProcessor"/> -->
</split>
<to uri="{{fileDestinationEndpoint}}" />
</route>
这行不通。我认为这会起作用,因为如果 property.CamelSlitIndex > 0,它应该只执行语句中的内容,但它会为每一行执行,即使 CamelSplitIndex =- 0。
这是bug还是我用错了???
<route
id="core.pre.consumer.route"
autoStartup="true" >
<from id="preConsumer" ref="preConsumer" />
<convertBodyTo type="java.lang.String" />
<split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
<tokenize token="\n"/>
<log message="Split line ${body}"/>
<choice>
<when>
<simple>"${property.CamelSplitIndex} > 0"</simple>
<process ref="timeStampEnrichmentProcessor" />
<log message="Camel Split Index is greater than zero: ${property.CamelSplitIndex}" />
</when>
</choice>
</split>
<log message="body: ${body}" />
<to uri="{{fileDestinationEndpoint}}" />
</route>
有点晚了但是...
将逻辑放入聚合器中...
<route
id="fadec.core.PostFlightReportProducerRoute"
autoStartup="true">
<from uri="seda:postFlightReportProducer" />
<split streaming="true" strategyRef="statusAggregationStrategy">
<simple>${body}</simple>
<log message="inbound Post Flight Summary Report message body: ${body}" />
<process ref="postFlightReportMarshaler" />
<to uri="velocity:templates/postFlightReportSummary.vm" />
<log message="Velocity output: ${body}" loggingLevel="INFO" />
</split>
<wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.csv" />
<choice>
<when>
<simple>${body} == null</simple>
<log message="body is NULL, do not send NULL body!" />
<stop></stop>
</when>
<otherwise>
<process ref="xlsxProcessor"/>
<wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
<log message="Sending data packet: ${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
<stop></stop>
</otherwise>
</choice>
</route>
public class StatusAggregationStrategy implements AggregationStrategy {
private Logger log = LoggerFactory.getLogger(StatusAggregationStrategy.class.getName());
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
//-------------------------------------------------------------------------------------
// Arrived | oldExchange | newExchange | Description
//-------------------------------------------------------------------------------------
// A | NULL | A | first message arrives for the first group
// B | A | B | second message arrives for the first group
// F | NULL | F | first message arrives for the second group
// C | AB | C | third message arrives for the first group
//---------------------------------------------------------------------------------------
log.info("Status Aggregation Strategy :: start");
if ( oldExchange == null ) { //This will set the 1st record with the Header
log.info("old Exchange is Null");
String body = newExchange.getIn().getBody(String.class);
newExchange.getIn().setBody(body);
return newExchange;
}
//Each newBody msg exchange will have 2 records, a header and a data record
String newBody = newExchange.getIn().getBody(String.class);
String existingBody = oldExchange.getIn().getBody(String.class);
StringBuilder osb = new StringBuilder();
log.info("New Body exchange: " + newBody);
log.info("Old Body exchange: " + existingBody);
String SkipRecord = "";
String addRecord = "";
Scanner osc = new Scanner(newBody).useDelimiter("\r\n|\n");
while ( osc.hasNextLine() ) {
SkipRecord = osc.nextLine();
//osc.nextLine(); //move past header
log.info("aggregation: skip record: " + SkipRecord);
if ( osc.hasNextLine() ) {
addRecord = osc.nextLine();
log.info("aggregation addRecord: " + addRecord );
osb.append(addRecord).append(System.lineSeparator());
} else { log.error("bad newBody message exchange, has no data record!"); }
}
osc.close();
log.info("Joined exchange: Old body: " + existingBody + " New body: " + osb.toString());
oldExchange.getIn().setBody(existingBody + osb.toString() );
log.debug("Status Aggregation Strategy :: finish");
return oldExchange;
} //Exchange process
您可以使用选项
org.apache.camel.dataformat.csv.CsvDataFormat#setSkipHeaderRecord(true)
工作原理如下:
package app;
import io.github.dgroup.j2gp.processor.GroupByParent;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.csv.CsvDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.csv.CSVFormat;
import java.util.List;
public class App {
public static void main(String[] args) throws Exception {
CamelContext ctx = new DefaultCamelContext();
ctx.addRoutes(new RouteBuilder() {
public void configure() {
CsvDataFormat csv = new CsvDataFormat();
csv.setFormat(CSVFormat.EXCEL);
csv.setSkipHeaderRecord(true); // <- ignore first line (e.g. csv header)
from("file:data/inbox?noop=true")
.unmarshal(csv)
.split(body())
.log("Reached ${body}")
.process(exchange -> {
List<String> line = exchange.getIn().getBody(List.class);
...
});
...
}
});
// start the route and let it do its work
ctx.start();
Thread.sleep(10000);
// stop the CamelContext
ctx.stop();
}
}
有没有一种简单的方法可以跳过 csv 的第一行,header,使用 camel EIP 拆分器处理所有其他行并聚合所有行,包括跳过的行? 我需要转换 CSV 文件中每条记录的日期,但跳过第一行 header。我正在尝试使用骆驼 EIP 拆分器。 谢谢!
<route
id="core.predix.consumer.route"
autoStartup="true" >
<from id="predixConsumer" ref="predixConsumer" />
<convertBodyTo type="java.lang.String" />
<split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
<tokenize token="\n"/>
<log message="Split line ${body}"/>
<!-- <process ref="EnrichementProcessor"/> -->
</split>
<to uri="{{fileDestinationEndpoint}}" />
</route>
这行不通。我认为这会起作用,因为如果 property.CamelSlitIndex > 0,它应该只执行语句中的内容,但它会为每一行执行,即使 CamelSplitIndex =- 0。
这是bug还是我用错了???
<route
id="core.pre.consumer.route"
autoStartup="true" >
<from id="preConsumer" ref="preConsumer" />
<convertBodyTo type="java.lang.String" />
<split streaming="true" > <!-- strategyRef="enrichmentAggregationStrategy" stopOnException="true"> -->
<tokenize token="\n"/>
<log message="Split line ${body}"/>
<choice>
<when>
<simple>"${property.CamelSplitIndex} > 0"</simple>
<process ref="timeStampEnrichmentProcessor" />
<log message="Camel Split Index is greater than zero: ${property.CamelSplitIndex}" />
</when>
</choice>
</split>
<log message="body: ${body}" />
<to uri="{{fileDestinationEndpoint}}" />
</route>
有点晚了但是...
将逻辑放入聚合器中...
<route
id="fadec.core.PostFlightReportProducerRoute"
autoStartup="true">
<from uri="seda:postFlightReportProducer" />
<split streaming="true" strategyRef="statusAggregationStrategy">
<simple>${body}</simple>
<log message="inbound Post Flight Summary Report message body: ${body}" />
<process ref="postFlightReportMarshaler" />
<to uri="velocity:templates/postFlightReportSummary.vm" />
<log message="Velocity output: ${body}" loggingLevel="INFO" />
</split>
<wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.csv" />
<choice>
<when>
<simple>${body} == null</simple>
<log message="body is NULL, do not send NULL body!" />
<stop></stop>
</when>
<otherwise>
<process ref="xlsxProcessor"/>
<wireTap uri="{{DFEndpointTest}}fileName=/${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
<log message="Sending data packet: ${header.aircraftMetadata.outputPath}/${header.ship}_${date:now:yyyy-MM-dd_HH-mm-ss}.xlsx" />
<stop></stop>
</otherwise>
</choice>
</route>
public class StatusAggregationStrategy implements AggregationStrategy {
private Logger log = LoggerFactory.getLogger(StatusAggregationStrategy.class.getName());
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
//-------------------------------------------------------------------------------------
// Arrived | oldExchange | newExchange | Description
//-------------------------------------------------------------------------------------
// A | NULL | A | first message arrives for the first group
// B | A | B | second message arrives for the first group
// F | NULL | F | first message arrives for the second group
// C | AB | C | third message arrives for the first group
//---------------------------------------------------------------------------------------
log.info("Status Aggregation Strategy :: start");
if ( oldExchange == null ) { //This will set the 1st record with the Header
log.info("old Exchange is Null");
String body = newExchange.getIn().getBody(String.class);
newExchange.getIn().setBody(body);
return newExchange;
}
//Each newBody msg exchange will have 2 records, a header and a data record
String newBody = newExchange.getIn().getBody(String.class);
String existingBody = oldExchange.getIn().getBody(String.class);
StringBuilder osb = new StringBuilder();
log.info("New Body exchange: " + newBody);
log.info("Old Body exchange: " + existingBody);
String SkipRecord = "";
String addRecord = "";
Scanner osc = new Scanner(newBody).useDelimiter("\r\n|\n");
while ( osc.hasNextLine() ) {
SkipRecord = osc.nextLine();
//osc.nextLine(); //move past header
log.info("aggregation: skip record: " + SkipRecord);
if ( osc.hasNextLine() ) {
addRecord = osc.nextLine();
log.info("aggregation addRecord: " + addRecord );
osb.append(addRecord).append(System.lineSeparator());
} else { log.error("bad newBody message exchange, has no data record!"); }
}
osc.close();
log.info("Joined exchange: Old body: " + existingBody + " New body: " + osb.toString());
oldExchange.getIn().setBody(existingBody + osb.toString() );
log.debug("Status Aggregation Strategy :: finish");
return oldExchange;
} //Exchange process
您可以使用选项 org.apache.camel.dataformat.csv.CsvDataFormat#setSkipHeaderRecord(true)
工作原理如下:
package app;
import io.github.dgroup.j2gp.processor.GroupByParent;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.csv.CsvDataFormat;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.csv.CSVFormat;
import java.util.List;
public class App {
public static void main(String[] args) throws Exception {
CamelContext ctx = new DefaultCamelContext();
ctx.addRoutes(new RouteBuilder() {
public void configure() {
CsvDataFormat csv = new CsvDataFormat();
csv.setFormat(CSVFormat.EXCEL);
csv.setSkipHeaderRecord(true); // <- ignore first line (e.g. csv header)
from("file:data/inbox?noop=true")
.unmarshal(csv)
.split(body())
.log("Reached ${body}")
.process(exchange -> {
List<String> line = exchange.getIn().getBody(List.class);
...
});
...
}
});
// start the route and let it do its work
ctx.start();
Thread.sleep(10000);
// stop the CamelContext
ctx.stop();
}
}