camel 拆分器可以跳过某些值(如空或 null)的消息行吗?
can the camel splitter skip on messages rows of some value like empty or null?
我在收到文件的入口处采用骆驼路线,有时这些文件包含多个可能是数千个空行或记录。这些发生在文件的末尾。
关于如何处理这种情况的帮助或建议。
2/3/20 0:25,12.0837099,22.07255971,51.15338002,52.76662495,52.34712651,51.12155216,45.7655507,49.96555147,54.47205637,50.66135512,54.43864717,54.31627797,112.11765,1305.89126,1318.734411,52.31780487,44.27374363,48.72548294,43.01383257,23.85434055,41.98898447,47.50916052,31.13055873,112.2747269,0.773642045,1.081464888,2.740194779,1.938788885,1.421660186,0.617588546,21.28219363,25.03362771,26.76627344,40.21132809,29.72854555,33.45911109
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
路线通往分路器。
<route autoStartup="true" id="core.predix.accept.file.type.route">
<from id="_from3" uri="{{fileEntranceEndpoint}}"/>
<convertBodyTo id="_convertBodyTo1" type="java.lang.String"/>
<split id="_split1" strategyRef="csvAggregationStrategy" streaming="true" stopOnException="true">
<tokenize token="\n"/>
<process id="_process3" ref="toCsvFormat"/>
<!-- passthru only we do not allow embedded commas in numeric data -->
</split>
<log id="_log1" loggingLevel="INFO" message="CSV body: ${body}"/>
<choice id="_choice1">
<when id="_when1">
<simple>${header.CamelFileName} regex '^.*\.(csv|CSV|txt|gpg)$'</simple>
<log id="_log2" message="${file:name} accepted for processing..."/>
<choice id="_choice2">
<when id="_when2">
<simple>${header.CamelFileName} regex '^.*\.(CSV|txt|gpg)$'</simple>
<setHeader headerName="CamelFileName" id="_setHeader1">
<simple>${file:name.noext.single}.csv</simple>
</setHeader>
<log id="_log3" message="${file:name} changed file name."/>
</when>
</choice>
<split id="_split2" streaming="true">
<tokenize prop:group="noOfLines" token="\n"/>
<log id="_log4" message="Split Group Body: ${body}"/>
<to id="_to1" uri="bean:extractHeader"/>
<to id="acceptedFileType" ref="predixConsumer"/>
</split>
<to id="_to2" uri="bean:extractHeader?method=cleanHeader"/>
</when>
<otherwise id="_otherwise1">
<log id="_log5" loggingLevel="INFO" message="${file:name} is an unknown file type, sending to unhandled repo."/>
<to id="_to3" uri="{{unhandledArchive}}"/>
</otherwise>
</choice>
</route>
简单的聚合器
public class CsvAggregationStrategy implements AggregationStrategy {
private Logger log = LoggerFactory.getLogger(CsvAggregationStrategy.class.getName());
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
//Theory
//-------------------------------------------------------------------------------------
// Arrived | oldExchange | newExchange | Description
//-------------------------------------------------------------------------------------
// A | NULL | A | first message arrives for the first group
// B | A | B | second message arrives for the first group
// F | NULL | F | first message arrives for the second group
// C | AB | C | third message arrives for the first group
//---------------------------------------------------------------------------------------
log.debug("Aggregation Strategy :: start");
if ( newExchange.getException() != null ) {
if ( oldExchange == null ) {
return newExchange;
} else {
oldExchange.setException(newExchange.getException());
return oldExchange;
}
}
if ( oldExchange == null ) { //This will set the 1st record with the Header
return newExchange;
}
String newBody = newExchange.getIn().getBody(String.class);
String oldBody = oldExchange.getIn().getBody(String.class);
String body = oldBody + newBody;
oldExchange.getIn().setBody( body );
log.debug("Aggregation Strategy :: finish");
return oldExchange;
} //Exchange process
} //class AggregationStrategy
我想我会处理 class toCsvFormat
中的空行
class ToCsvFormat 只是将入站 csv 分隔符更改为逗号。
public class ToCsvFormat implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(ToCsvFormat.class);
@Override
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
body = body.replaceAll("\t|;",",");
String bodyCheck = body.replaceAll(",","").trim();
LOG.info("BODY CHECK: " + bodyCheck);
if ( bodyCheck.isEmpty() || bodyCheck == null ) {
throw new IllegalArgumentException("Data record is Empty or NULL. Invalid Data!");
} else {
StringBuilder sb = new StringBuilder(body.trim());
LOG.debug("CSV Format Body In: " + sb.toString());
LOG.debug("sb length: " + sb.length());
if ( sb.toString().endsWith(",") ) {
sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
}
LOG.info("CSV Format Body Out: " + sb.toString());
sb.append(System.lineSeparator());
exchange.getIn().setBody(sb.toString());
}
}
}
*** 我遇到的问题是我需要拆分器完成处理,直到它命中所有空行,或者跳过或停止空记录上的拆分器。但我需要以前拆分或处理过的东西。异常的抛出和捕获停止了分离器,我什么也得不到。我正在使用拆分器 stoponexception 但就像它说的那样,它在异常时停止。
谢谢
所以你设置了 stopOnException=true 并询问为什么你的路由在没有捕获到异常时停止 =) ?作为解决方法,忘记抛出异常并验证您的 body,如果它有不适当的数据,只需将 body 设置为空,然后将它们汇总到您的 AggregationStrategy 中,如下面的 pseudo-route 所示。我已经很长时间没有使用 xml 描述了,所以我希望你能用 Java DSL 理解这个例子。
public class ExampleRoute extends RouteBuilder {
AggregationStrategy aggregationStrategy = new AggregationStrategy() {
@Override
public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
log.debug("Aggregation Strategy :: start");
if (oldExchange != null) {
newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + oldExchange.getIn().getBody(String.class));
}
log.debug("Aggregation Strategy :: finish");
return newExchange;
}
};
@Override
public void configure() throws Exception {
from("{{fileEntranceEndpoint}}")
.convertBodyTo(String.class)
.split(tokenize("\n"), aggregationStrategy).streaming().stopOnException()
.choice()
.when(body().regex(",+\$"))
.setBody(constant(""))
.otherwise()
.process("toCsvFormat")
;
}
我建议您使用 Java DSL。如您所见,很多东西用它都很容易使用。
谢谢 C0ld。欣赏轻松。是的,我明白了。有时我们会做傻事,为什么另一双眼睛是一件美妙的事情。我接受了你的建议,它就像一个魅力。非常感谢您的回复。
<split id="_split1"
strategyRef="emptyRecordAggregationStrategy" streaming="true">
<tokenize token="\n"/>
<choice id="_choice5">
<when id="_when5">
<simple>${body} regex '^,+$'</simple>
<setBody id="_setBody1">
<constant/>
</setBody>
</when>
<otherwise>
<process id="_processCSV" ref="toCsvFormat"/>
</otherwise>
</choice>
</split>
public class EmptyRecordAggregationStrategy implements AggregationStrategy {
private Logger log = LoggerFactory.getLogger(EmptyRecordAggregationStrategy.class.getName());
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if ( newExchange.getException() != null ) {
if ( oldExchange == null ) {
newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + System.lineSeparator());
return newExchange;
} else {
oldExchange.getIn().setBody(oldExchange.getIn().getBody(String.class) + System.lineSeparator());
return oldExchange;
}
}
if ( oldExchange == null ) {
newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + System.lineSeparator());
return newExchange;
}
if ( !newExchange.getIn().getBody(String.class).isEmpty() ) {
oldExchange.getIn().setBody(oldExchange.getIn().getBody(String.class) + newExchange.getIn().getBody(String.class) + System.lineSeparator());
}
return oldExchange;
}
}
我在收到文件的入口处采用骆驼路线,有时这些文件包含多个可能是数千个空行或记录。这些发生在文件的末尾。
关于如何处理这种情况的帮助或建议。
2/3/20 0:25,12.0837099,22.07255971,51.15338002,52.76662495,52.34712651,51.12155216,45.7655507,49.96555147,54.47205637,50.66135512,54.43864717,54.31627797,112.11765,1305.89126,1318.734411,52.31780487,44.27374363,48.72548294,43.01383257,23.85434055,41.98898447,47.50916052,31.13055873,112.2747269,0.773642045,1.081464888,2.740194779,1.938788885,1.421660186,0.617588546,21.28219363,25.03362771,26.76627344,40.21132809,29.72854555,33.45911109
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
路线通往分路器。
<route autoStartup="true" id="core.predix.accept.file.type.route">
<from id="_from3" uri="{{fileEntranceEndpoint}}"/>
<convertBodyTo id="_convertBodyTo1" type="java.lang.String"/>
<split id="_split1" strategyRef="csvAggregationStrategy" streaming="true" stopOnException="true">
<tokenize token="\n"/>
<process id="_process3" ref="toCsvFormat"/>
<!-- passthru only we do not allow embedded commas in numeric data -->
</split>
<log id="_log1" loggingLevel="INFO" message="CSV body: ${body}"/>
<choice id="_choice1">
<when id="_when1">
<simple>${header.CamelFileName} regex '^.*\.(csv|CSV|txt|gpg)$'</simple>
<log id="_log2" message="${file:name} accepted for processing..."/>
<choice id="_choice2">
<when id="_when2">
<simple>${header.CamelFileName} regex '^.*\.(CSV|txt|gpg)$'</simple>
<setHeader headerName="CamelFileName" id="_setHeader1">
<simple>${file:name.noext.single}.csv</simple>
</setHeader>
<log id="_log3" message="${file:name} changed file name."/>
</when>
</choice>
<split id="_split2" streaming="true">
<tokenize prop:group="noOfLines" token="\n"/>
<log id="_log4" message="Split Group Body: ${body}"/>
<to id="_to1" uri="bean:extractHeader"/>
<to id="acceptedFileType" ref="predixConsumer"/>
</split>
<to id="_to2" uri="bean:extractHeader?method=cleanHeader"/>
</when>
<otherwise id="_otherwise1">
<log id="_log5" loggingLevel="INFO" message="${file:name} is an unknown file type, sending to unhandled repo."/>
<to id="_to3" uri="{{unhandledArchive}}"/>
</otherwise>
</choice>
</route>
简单的聚合器
public class CsvAggregationStrategy implements AggregationStrategy {
private Logger log = LoggerFactory.getLogger(CsvAggregationStrategy.class.getName());
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
//Theory
//-------------------------------------------------------------------------------------
// Arrived | oldExchange | newExchange | Description
//-------------------------------------------------------------------------------------
// A | NULL | A | first message arrives for the first group
// B | A | B | second message arrives for the first group
// F | NULL | F | first message arrives for the second group
// C | AB | C | third message arrives for the first group
//---------------------------------------------------------------------------------------
log.debug("Aggregation Strategy :: start");
if ( newExchange.getException() != null ) {
if ( oldExchange == null ) {
return newExchange;
} else {
oldExchange.setException(newExchange.getException());
return oldExchange;
}
}
if ( oldExchange == null ) { //This will set the 1st record with the Header
return newExchange;
}
String newBody = newExchange.getIn().getBody(String.class);
String oldBody = oldExchange.getIn().getBody(String.class);
String body = oldBody + newBody;
oldExchange.getIn().setBody( body );
log.debug("Aggregation Strategy :: finish");
return oldExchange;
} //Exchange process
} //class AggregationStrategy
我想我会处理 class toCsvFormat
中的空行class ToCsvFormat 只是将入站 csv 分隔符更改为逗号。
public class ToCsvFormat implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(ToCsvFormat.class);
@Override
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
body = body.replaceAll("\t|;",",");
String bodyCheck = body.replaceAll(",","").trim();
LOG.info("BODY CHECK: " + bodyCheck);
if ( bodyCheck.isEmpty() || bodyCheck == null ) {
throw new IllegalArgumentException("Data record is Empty or NULL. Invalid Data!");
} else {
StringBuilder sb = new StringBuilder(body.trim());
LOG.debug("CSV Format Body In: " + sb.toString());
LOG.debug("sb length: " + sb.length());
if ( sb.toString().endsWith(",") ) {
sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
}
LOG.info("CSV Format Body Out: " + sb.toString());
sb.append(System.lineSeparator());
exchange.getIn().setBody(sb.toString());
}
}
}
*** 我遇到的问题是我需要拆分器完成处理,直到它命中所有空行,或者跳过或停止空记录上的拆分器。但我需要以前拆分或处理过的东西。异常的抛出和捕获停止了分离器,我什么也得不到。我正在使用拆分器 stoponexception 但就像它说的那样,它在异常时停止。
谢谢
所以你设置了 stopOnException=true 并询问为什么你的路由在没有捕获到异常时停止 =) ?作为解决方法,忘记抛出异常并验证您的 body,如果它有不适当的数据,只需将 body 设置为空,然后将它们汇总到您的 AggregationStrategy 中,如下面的 pseudo-route 所示。我已经很长时间没有使用 xml 描述了,所以我希望你能用 Java DSL 理解这个例子。
public class ExampleRoute extends RouteBuilder {
AggregationStrategy aggregationStrategy = new AggregationStrategy() {
@Override
public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
log.debug("Aggregation Strategy :: start");
if (oldExchange != null) {
newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + oldExchange.getIn().getBody(String.class));
}
log.debug("Aggregation Strategy :: finish");
return newExchange;
}
};
@Override
public void configure() throws Exception {
from("{{fileEntranceEndpoint}}")
.convertBodyTo(String.class)
.split(tokenize("\n"), aggregationStrategy).streaming().stopOnException()
.choice()
.when(body().regex(",+\$"))
.setBody(constant(""))
.otherwise()
.process("toCsvFormat")
;
}
我建议您使用 Java DSL。如您所见,很多东西用它都很容易使用。
谢谢 C0ld。欣赏轻松。是的,我明白了。有时我们会做傻事,为什么另一双眼睛是一件美妙的事情。我接受了你的建议,它就像一个魅力。非常感谢您的回复。
<split id="_split1"
strategyRef="emptyRecordAggregationStrategy" streaming="true">
<tokenize token="\n"/>
<choice id="_choice5">
<when id="_when5">
<simple>${body} regex '^,+$'</simple>
<setBody id="_setBody1">
<constant/>
</setBody>
</when>
<otherwise>
<process id="_processCSV" ref="toCsvFormat"/>
</otherwise>
</choice>
</split>
public class EmptyRecordAggregationStrategy implements AggregationStrategy {
private Logger log = LoggerFactory.getLogger(EmptyRecordAggregationStrategy.class.getName());
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if ( newExchange.getException() != null ) {
if ( oldExchange == null ) {
newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + System.lineSeparator());
return newExchange;
} else {
oldExchange.getIn().setBody(oldExchange.getIn().getBody(String.class) + System.lineSeparator());
return oldExchange;
}
}
if ( oldExchange == null ) {
newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + System.lineSeparator());
return newExchange;
}
if ( !newExchange.getIn().getBody(String.class).isEmpty() ) {
oldExchange.getIn().setBody(oldExchange.getIn().getBody(String.class) + newExchange.getIn().getBody(String.class) + System.lineSeparator());
}
return oldExchange;
}
}