Apache Camel Splitter EIP 分组依据
Apache Camel Splitter EIP group by
我有一个 CSV 文件的任务。我需要限制 CSV 的大小,因为后端引擎对负载大小有限制。
问题是提取 header,第一个 record/row,将其保存并将其添加回剩余的拆分数据,从而创建多个具有相同 header 的文件.我希望找到一种优雅的方式来处理这个问题。我有什么工作,但它很好,不太理想的编码。
另外,我需要 group By 参数是可编程的,我现在想知道这是否可以通过 camelContext 中的 属性 设置。
这就是我所拥有的,它可以工作,但是...我无法让 groupBy 接受参数。
我的路线
<!-- route on Weekends -->
<route id="inRouteWkEndBfmt1" routePolicyRef="startPolicyWkEnd" autoStartup="false" >
<from id="mainProcessingRouteWkEnd" ref="AsciiGatewayBackfillmt1" />
<convertBodyTo type="java.lang.String" />
<log message="File ${file:name} was received."/>
<setHeader headerName="messageDateTime">
<simple>${date:now:MM-dd-yyyy-HH:mm:ss}</simple>
</setHeader>
<split streaming="true" >
<tokenize token="\n" group="50"/>
<log message="Split line Body: ${body}"/>
<process ref="asciiSplitterProcessor" />
<log loggingLevel="INFO" message="Successfully sent ${file:name} to MT1 Core for Analytics Observation." />
<to id="windowsShareTargetWkEnd" uri="file://{{target.folder}}" />
</split>
<process ref="asciiCleanUp" />
</route>
代码
public void process(Exchange exchange) throws Exception {
log.info("Ascii Splitter Processor :: start");
String inBody = exchange.getIn().getBody(String.class);
String fileName = (String) exchange.getIn().getHeader("CamelFileName");
String fileSuffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());
String filePrefix = fileName.substring(0, fileName.lastIndexOf("."));
fileName = filePrefix + "_" + cntr + fileSuffix;
exchange.getIn().setHeader("CamelFileName",fileName);
cntr++;
fileName = (String) exchange.getIn().getHeader("CamelFileName");
log.info("File being processed: " + fileName );
log.debug("Message record: " + inBody);
StringBuilder sb = new StringBuilder();
Scanner sc = new Scanner(inBody);
if ( ! hdrFlag ) {
while ( sc.hasNextLine() ) {
record = sc.nextLine();
log.debug("record: " + record);
log.debug("HEADER FLAG: " + hdrFlag);
if ( !hdrFlag ){
HEADER = record + "\r\n";
hdrFlag = true;
log.debug("HEADER: " + HEADER);
}
sb.append(record).append("\r\n");
}
} else {
sb.append(HEADER).append(inBody);
}
sc.close();
exchange.getIn().setBody(sb.toString());
sb = new StringBuilder();
我觉得这比上面更优雅一点。不幸的是我不在骆驼 2.9 上。但这适用于需要在服务器端加入大量 CSV 负载的子工作单元,我正在转换为 Json 并发送到服务器。
谢谢大家。希望这可以帮助其他人处理用例。
public void process(Exchange exchange) throws Exception {
log.info("Entering Extract Header Processor ...");
//if file is split in to multiple files modify the name with an index
String fileName = (String) exchange.getIn().getHeader("CamelFileName");
String fileSuffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());
String filePrefix = fileName.substring(0, fileName.lastIndexOf("."));
fileName = filePrefix + "_" + fileCounter + fileSuffix;
fileCounter++;
//fileName = filePrefix + "_" + Integer.valueOf((int)exchange.getProperty("CamelSplitSize")) + fileSuffix; // need camel 2.9 for this to work, bummer
exchange.getIn().setHeader("CamelFileName",fileName);
log.info(" FILE NAME: " + exchange.getIn().getHeader("CamelFileName", fileName));
//log.info("File Counter: " + Integer.valueOf((int)exchange.getProperty("CamelSplitSize"))); // need camel 2.9 for this to work, bummer
log.info("File Counter: " + fileCounter );
//if this is the first split body, get the header to attach to the other split bodies
String body = exchange.getIn().getBody(String.class);
StringBuilder sb = new StringBuilder();
if ( (Integer.valueOf((int)exchange.getProperty("CamelSplitIndex")) == 0 ) ) {
List<String> serviceRecords = new ArrayList<String>(Arrays.asList(body.split(System.lineSeparator())));
StringBuilder header = getHeader( serviceRecords );
HEADER = header.toString();
exchange.getIn().setBody(body);
} else {
sb.append(HEADER).append(System.lineSeparator()).append(body);
exchange.getIn().setBody(sb.toString());
}
sb = new StringBuilder();
log.debug("HEADER: " + HEADER);
log.info("Exiting Extract Header Processor ... :: Finish");
}
public StringBuilder getHeader(List<String> serviceRecords) {
StringBuilder sb = new StringBuilder();
for ( int i = 0; i < 1; i++ ) {
log.debug("record: : " + serviceRecords.get(i).toString());
if ( i == 0 ) {
String[] sa = serviceRecords.get(i).toString().split(",");
for ( int j = 0; j < sa.length; ++j) {
if ( j != 0 ) {
sb.append(sa[j]).append(",");
}
}
sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
} else {
break;
}
}
return sb;
}
public void cleanHeader() {
HEADER = "";
fileCounter = 0;
}
}
路线
<route
id="core.accept.file.type.route"
autoStartup="true" >
<from uri="{{fileEntranceEndpoint}}" />
<choice>
<when>
<simple>${header.CamelFileName} regex '^.*\.(csv|CSV)$'</simple>
<log message="${file:name} accepted for processing..." />
<choice>
<when>
<simple>${header.CamelFileName} regex '^.*\.(CSV)$'</simple>
<setHeader headerName="CamelFileName">
<simple>${file:name.noext}.csv</simple>
</setHeader>
</when>
</choice>
<split streaming="true" >
<tokenize token="\n" group="600" />
<log message="Split Group Body: ${body}"/>
<to uri="bean:extractHeader" />
<to id="acceptedFileType" ref="pConsumer" />
</split>
<to uri="bean:extractHeader?method=cleanHeader"/>
<!-- <to id="acceptedFileType" ref="pConsumer" /> -->
</when>
<otherwise>
<log message="${file:name} is an unknown file type, sending to unhandled repo." loggingLevel="INFO" />
<to uri="{{unhandledArchive}}" />
</otherwise>
</choice>
</route>
我有一个 CSV 文件的任务。我需要限制 CSV 的大小,因为后端引擎对负载大小有限制。
问题是提取 header,第一个 record/row,将其保存并将其添加回剩余的拆分数据,从而创建多个具有相同 header 的文件.我希望找到一种优雅的方式来处理这个问题。我有什么工作,但它很好,不太理想的编码。
另外,我需要 group By 参数是可编程的,我现在想知道这是否可以通过 camelContext 中的 属性 设置。
这就是我所拥有的,它可以工作,但是...我无法让 groupBy 接受参数。
我的路线
<!-- route on Weekends -->
<route id="inRouteWkEndBfmt1" routePolicyRef="startPolicyWkEnd" autoStartup="false" >
<from id="mainProcessingRouteWkEnd" ref="AsciiGatewayBackfillmt1" />
<convertBodyTo type="java.lang.String" />
<log message="File ${file:name} was received."/>
<setHeader headerName="messageDateTime">
<simple>${date:now:MM-dd-yyyy-HH:mm:ss}</simple>
</setHeader>
<split streaming="true" >
<tokenize token="\n" group="50"/>
<log message="Split line Body: ${body}"/>
<process ref="asciiSplitterProcessor" />
<log loggingLevel="INFO" message="Successfully sent ${file:name} to MT1 Core for Analytics Observation." />
<to id="windowsShareTargetWkEnd" uri="file://{{target.folder}}" />
</split>
<process ref="asciiCleanUp" />
</route>
代码
public void process(Exchange exchange) throws Exception {
log.info("Ascii Splitter Processor :: start");
String inBody = exchange.getIn().getBody(String.class);
String fileName = (String) exchange.getIn().getHeader("CamelFileName");
String fileSuffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());
String filePrefix = fileName.substring(0, fileName.lastIndexOf("."));
fileName = filePrefix + "_" + cntr + fileSuffix;
exchange.getIn().setHeader("CamelFileName",fileName);
cntr++;
fileName = (String) exchange.getIn().getHeader("CamelFileName");
log.info("File being processed: " + fileName );
log.debug("Message record: " + inBody);
StringBuilder sb = new StringBuilder();
Scanner sc = new Scanner(inBody);
if ( ! hdrFlag ) {
while ( sc.hasNextLine() ) {
record = sc.nextLine();
log.debug("record: " + record);
log.debug("HEADER FLAG: " + hdrFlag);
if ( !hdrFlag ){
HEADER = record + "\r\n";
hdrFlag = true;
log.debug("HEADER: " + HEADER);
}
sb.append(record).append("\r\n");
}
} else {
sb.append(HEADER).append(inBody);
}
sc.close();
exchange.getIn().setBody(sb.toString());
sb = new StringBuilder();
我觉得这比上面更优雅一点。不幸的是我不在骆驼 2.9 上。但这适用于需要在服务器端加入大量 CSV 负载的子工作单元,我正在转换为 Json 并发送到服务器。
谢谢大家。希望这可以帮助其他人处理用例。
public void process(Exchange exchange) throws Exception {
log.info("Entering Extract Header Processor ...");
//if file is split in to multiple files modify the name with an index
String fileName = (String) exchange.getIn().getHeader("CamelFileName");
String fileSuffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());
String filePrefix = fileName.substring(0, fileName.lastIndexOf("."));
fileName = filePrefix + "_" + fileCounter + fileSuffix;
fileCounter++;
//fileName = filePrefix + "_" + Integer.valueOf((int)exchange.getProperty("CamelSplitSize")) + fileSuffix; // need camel 2.9 for this to work, bummer
exchange.getIn().setHeader("CamelFileName",fileName);
log.info(" FILE NAME: " + exchange.getIn().getHeader("CamelFileName", fileName));
//log.info("File Counter: " + Integer.valueOf((int)exchange.getProperty("CamelSplitSize"))); // need camel 2.9 for this to work, bummer
log.info("File Counter: " + fileCounter );
//if this is the first split body, get the header to attach to the other split bodies
String body = exchange.getIn().getBody(String.class);
StringBuilder sb = new StringBuilder();
if ( (Integer.valueOf((int)exchange.getProperty("CamelSplitIndex")) == 0 ) ) {
List<String> serviceRecords = new ArrayList<String>(Arrays.asList(body.split(System.lineSeparator())));
StringBuilder header = getHeader( serviceRecords );
HEADER = header.toString();
exchange.getIn().setBody(body);
} else {
sb.append(HEADER).append(System.lineSeparator()).append(body);
exchange.getIn().setBody(sb.toString());
}
sb = new StringBuilder();
log.debug("HEADER: " + HEADER);
log.info("Exiting Extract Header Processor ... :: Finish");
}
public StringBuilder getHeader(List<String> serviceRecords) {
StringBuilder sb = new StringBuilder();
for ( int i = 0; i < 1; i++ ) {
log.debug("record: : " + serviceRecords.get(i).toString());
if ( i == 0 ) {
String[] sa = serviceRecords.get(i).toString().split(",");
for ( int j = 0; j < sa.length; ++j) {
if ( j != 0 ) {
sb.append(sa[j]).append(",");
}
}
sb.deleteCharAt(sb.lastIndexOf(",", sb.length()));
} else {
break;
}
}
return sb;
}
public void cleanHeader() {
HEADER = "";
fileCounter = 0;
}
}
路线
<route
id="core.accept.file.type.route"
autoStartup="true" >
<from uri="{{fileEntranceEndpoint}}" />
<choice>
<when>
<simple>${header.CamelFileName} regex '^.*\.(csv|CSV)$'</simple>
<log message="${file:name} accepted for processing..." />
<choice>
<when>
<simple>${header.CamelFileName} regex '^.*\.(CSV)$'</simple>
<setHeader headerName="CamelFileName">
<simple>${file:name.noext}.csv</simple>
</setHeader>
</when>
</choice>
<split streaming="true" >
<tokenize token="\n" group="600" />
<log message="Split Group Body: ${body}"/>
<to uri="bean:extractHeader" />
<to id="acceptedFileType" ref="pConsumer" />
</split>
<to uri="bean:extractHeader?method=cleanHeader"/>
<!-- <to id="acceptedFileType" ref="pConsumer" /> -->
</when>
<otherwise>
<log message="${file:name} is an unknown file type, sending to unhandled repo." loggingLevel="INFO" />
<to uri="{{unhandledArchive}}" />
</otherwise>
</choice>
</route>