如何使用 DSL 跳过 springcloudstream 中文件的 header 行(第一行)?
How to skip the header row(first line) of the file in springcloudstream using DSL?
我正在使用 Spring 云流读取文件并使用文件拆分器拆分并使用 DSL 样式将每一行作为消息发出,正在读取的文件有一个 header 行,只是想知道如果有一种简单的方法可以跳过 header 行 before/after 阅读。
感谢任何帮助。
这是我的分离器和集成流的样子:
enter code here
return IntegrationFlows
.from("....")
.split(Files.splitter(true, true)/
.charset(StandardCharsets.UTF_8)
.applySequence(true), //emmit sequenceNumber to header
e -> e.id("fileSplitter")
);
enter code here
IntegrationFlow integrationFlow = integrationFlowBuilder
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
.channelMapping(String.class, "lines.input"))
.get();
如果我没看错,您正在使用我们的 OOB 应用程序之一,文件来源:https://github.com/spring-cloud-stream-app-starters/file/blob/master/spring-cloud-starter-stream-source-file/README.adoc 并使用 Spring Cloud Dataflow dsl 进行部署,例如 stream create file ----file.consumer.mode=lines --file.directory=/tmp/ | sink
正确吗?
如果是这样,当您以行模式读取文件时,有一个特殊的 header 称为 sequence_number
。您可以在两者之间添加一个过滤器,以根据 header 表达式删除这些消息。
Spring集成5.1.5解决方案:
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File("./data/input"));
return messageSource;
}
@Bean
public IntegrationFlow folderFlow() {
FileSplitter fileSplitter = new FileSplitter();
fileSplitter.setFirstLineAsHeader("columns");
return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(1000)))
.split(fileSplitter)
.handle(System.out::println)
.get();
}
我正在使用 Spring 云流读取文件并使用文件拆分器拆分并使用 DSL 样式将每一行作为消息发出,正在读取的文件有一个 header 行,只是想知道如果有一种简单的方法可以跳过 header 行 before/after 阅读。
感谢任何帮助。
这是我的分离器和集成流的样子:
enter code here
return IntegrationFlows
.from("....")
.split(Files.splitter(true, true)/
.charset(StandardCharsets.UTF_8)
.applySequence(true), //emmit sequenceNumber to header
e -> e.id("fileSplitter")
);
enter code here
IntegrationFlow integrationFlow = integrationFlowBuilder
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
.channelMapping(String.class, "lines.input"))
.get();
如果我没看错,您正在使用我们的 OOB 应用程序之一,文件来源:https://github.com/spring-cloud-stream-app-starters/file/blob/master/spring-cloud-starter-stream-source-file/README.adoc 并使用 Spring Cloud Dataflow dsl 进行部署,例如 stream create file ----file.consumer.mode=lines --file.directory=/tmp/ | sink
正确吗?
如果是这样,当您以行模式读取文件时,有一个特殊的 header 称为 sequence_number
。您可以在两者之间添加一个过滤器,以根据 header 表达式删除这些消息。
Spring集成5.1.5解决方案:
@Bean
public MessageSource<File> sourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File("./data/input"));
return messageSource;
}
@Bean
public IntegrationFlow folderFlow() {
FileSplitter fileSplitter = new FileSplitter();
fileSplitter.setFirstLineAsHeader("columns");
return IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(1000)))
.split(fileSplitter)
.handle(System.out::println)
.get();
}