camel 拆分器可以跳过某些值(如空或 null)的消息行吗?

can the camel splitter skip on messages rows of some value like empty or null?

我在收到文件的入口处采用骆驼路线,有时这些文件包含多个可能是数千个空行或记录。这些发生在文件的末尾。

关于如何处理这种情况的帮助或建议。

2/3/20 0:25,12.0837099,22.07255971,51.15338002,52.76662495,52.34712651,51.12155216,45.7655507,49.96555147,54.47205637,50.66135512,54.43864717,54.31627797,112.11765,1305.89126,1318.734411,52.31780487,44.27374363,48.72548294,43.01383257,23.85434055,41.98898447,47.50916052,31.13055873,112.2747269,0.773642045,1.081464888,2.740194779,1.938788885,1.421660186,0.617588546,21.28219363,25.03362771,26.76627344,40.21132809,29.72854555,33.45911109
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,

路线通往分路器。

    <route autoStartup="true" id="core.predix.accept.file.type.route">
        <from id="_from3" uri="{{fileEntranceEndpoint}}"/>
        <convertBodyTo id="_convertBodyTo1" type="java.lang.String"/>
        <split id="_split1" strategyRef="csvAggregationStrategy" streaming="true" stopOnException="true">
            <tokenize token="\n"/>
            <process id="_process3" ref="toCsvFormat"/>
            <!-- passthru only we do not allow embedded commas in numeric data -->
        </split>
        <log id="_log1" loggingLevel="INFO" message="CSV body: ${body}"/>
        <choice id="_choice1">
            <when id="_when1">
                <simple>${header.CamelFileName} regex '^.*\.(csv|CSV|txt|gpg)$'</simple>
                <log id="_log2" message="${file:name} accepted for processing..."/>
                <choice id="_choice2">
                    <when id="_when2">
                        <simple>${header.CamelFileName} regex '^.*\.(CSV|txt|gpg)$'</simple>
                        <setHeader headerName="CamelFileName" id="_setHeader1">
                            <simple>${file:name.noext.single}.csv</simple>
                        </setHeader>
                        <log id="_log3" message="${file:name} changed file name."/>
                    </when>
                </choice>
                <split id="_split2" streaming="true">
                    <tokenize prop:group="noOfLines" token="\n"/>
                    <log id="_log4" message="Split Group Body: ${body}"/>
                    <to id="_to1" uri="bean:extractHeader"/>
                    <to id="acceptedFileType" ref="predixConsumer"/>
                </split>
                <to id="_to2" uri="bean:extractHeader?method=cleanHeader"/>
            </when>
            <otherwise id="_otherwise1">
                <log id="_log5" loggingLevel="INFO" message="${file:name} is an unknown file type, sending to unhandled repo."/>
                <to id="_to3" uri="{{unhandledArchive}}"/>
            </otherwise>
        </choice>
    </route>

简单的聚合器

public class CsvAggregationStrategy implements AggregationStrategy {

private Logger log = LoggerFactory.getLogger(CsvAggregationStrategy.class.getName());

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

    //Theory 
    //-------------------------------------------------------------------------------------
    // Arrived    | oldExchange  |  newExchange | Description
    //-------------------------------------------------------------------------------------
    // A         | NULL         |  A            | first message arrives for the first group
    // B         | A            |  B            | second message arrives for the first group
    // F         | NULL         |  F            | first message arrives for the second group
    // C         | AB           |  C            | third message arrives for the first group
    //---------------------------------------------------------------------------------------
    log.debug("Aggregation Strategy :: start");

    if ( newExchange.getException() != null ) {
      if ( oldExchange == null ) {
        return newExchange;
      } else {
        oldExchange.setException(newExchange.getException());
        return oldExchange;
        }
    }

    if ( oldExchange == null ) {  //This will set the 1st record with the Header
        return newExchange;
    }

String newBody = newExchange.getIn().getBody(String.class);
String oldBody = oldExchange.getIn().getBody(String.class);
String body = oldBody + newBody;
oldExchange.getIn().setBody( body );

log.debug("Aggregation Strategy :: finish");
return oldExchange;

} //Exchange process
} //class AggregationStrategy

我想我会处理 class toCsvFormat

中的空行

class ToCsvFormat 只是将入站 csv 分隔符更改为逗号。

public class ToCsvFormat implements Processor {

private static final Logger LOG = LoggerFactory.getLogger(ToCsvFormat.class);

@Override
public void process(Exchange exchange) throws Exception {

    String body = exchange.getIn().getBody(String.class);

    body = body.replaceAll("\t|;",",");

    String bodyCheck = body.replaceAll(",","").trim();
    LOG.info("BODY CHECK: " + bodyCheck);
    if ( bodyCheck.isEmpty() || bodyCheck == null ) {

        throw new IllegalArgumentException("Data record is Empty or NULL. Invalid Data!");

    } else {

        StringBuilder sb = new StringBuilder(body.trim());

        LOG.debug("CSV Format Body In: " + sb.toString());
        LOG.debug("sb length: " + sb.length());

        if ( sb.toString().endsWith(",") ) {

            sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
        }

        LOG.info("CSV Format Body Out: " + sb.toString());
        sb.append(System.lineSeparator());
        exchange.getIn().setBody(sb.toString());
    }

}

}

*** 我遇到的问题是我需要拆分器完成处理,直到它命中所有空行,或者跳过或停止空记录上的拆分器。但我需要以前拆分或处理过的东西。异常的抛出和捕获停止了分离器,我什么也得不到。我正在使用拆分器 stoponexception 但就像它说的那样,它在异常时停止。

谢谢

所以你设置了 stopOnException=true 并询问为什么你的路由在没有捕获到异常时停止 =) ?作为解决方法,忘记抛出异常并验证您的 body,如果它有不适当的数据,只需将 body 设置为空,然后将它们汇总到您的 AggregationStrategy 中,如下面的 pseudo-route 所示。我已经很长时间没有使用 xml 描述了,所以我希望你能用 Java DSL 理解这个例子。

public class ExampleRoute extends RouteBuilder {

AggregationStrategy aggregationStrategy = new AggregationStrategy() {
    @Override
    public Exchange aggregate(final Exchange oldExchange, final Exchange newExchange) {
        log.debug("Aggregation Strategy :: start");
        if (oldExchange != null) {
            newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + oldExchange.getIn().getBody(String.class));
        }
        log.debug("Aggregation Strategy :: finish");
        return newExchange;
    }
};

@Override
public void configure() throws Exception {
    from("{{fileEntranceEndpoint}}")
            .convertBodyTo(String.class)
            .split(tokenize("\n"), aggregationStrategy).streaming().stopOnException()
                .choice()
                .when(body().regex(",+\$"))
                    .setBody(constant(""))
                .otherwise()
                    .process("toCsvFormat")
    ;
}

我建议您使用 Java DSL。如您所见,很多东西用它都很容易使用。

谢谢 C0ld。欣赏轻松。是的,我明白了。有时我们会做傻事,为什么另一双眼睛是一件美妙的事情。我接受了你的建议,它就像一个魅力。非常感谢您的回复。

        <split id="_split1"
            strategyRef="emptyRecordAggregationStrategy" streaming="true">
            <tokenize token="\n"/>
            <choice id="_choice5">
                <when id="_when5">
                    <simple>${body} regex '^,+$'</simple>
                    <setBody id="_setBody1">
                        <constant/>
                    </setBody>
                </when>
                <otherwise>
                  <process id="_processCSV" ref="toCsvFormat"/>
                </otherwise>
            </choice>
        </split>


public class EmptyRecordAggregationStrategy implements AggregationStrategy {

private Logger log = LoggerFactory.getLogger(EmptyRecordAggregationStrategy.class.getName());

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

    if ( newExchange.getException() != null ) {
          if ( oldExchange == null ) {  
            newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + System.lineSeparator());
            return newExchange;
          } else {
            oldExchange.getIn().setBody(oldExchange.getIn().getBody(String.class) + System.lineSeparator());
            return oldExchange;
            }
        }

        if ( oldExchange == null ) {
            newExchange.getIn().setBody(newExchange.getIn().getBody(String.class) + System.lineSeparator());
            return newExchange;
        }

        if ( !newExchange.getIn().getBody(String.class).isEmpty() ) {
          oldExchange.getIn().setBody(oldExchange.getIn().getBody(String.class) + newExchange.getIn().getBody(String.class) + System.lineSeparator());
        }
        return oldExchange;
}

}