使用 Spring Cloud Dataflow 和 Kafka 流式传输 CSV 文件
Stream CSV file using Spring Cloud Dataflow with Kafka
我正在尝试使用 Spring 云数据流和 Kafka 流式传输 csv 文件。这个想法是逐行流式传输文件,每行包含 header 。例如,如果我的 csv 文件的内容是
street,city,zip,state
3526 HIGH ST,SACRAMENTO,95838,CA
51 OMAHA CT,SACRAMENTO,95823,CA
2796 BRANCH ST,SACRAMENTO,95815,CA
然后流数据应采用以下格式(所需输出)
street,city,zip,state
3526 HIGH ST,SACRAMENTO,95838,CA
street,city,zip,state
51 OMAHA CT,SACRAMENTO,95823,CA
street,city,zip,state
2796 BRANCH ST,SACRAMENTO,95815,CA
我正在使用 FileSplitter 将消息传递到输出通道。
private Source channels;
FileSplitter splitter = new FileSplitter(true, true);
splitter.setOutputChannel(channels.output());
splitter.handleMessage(new GenericMessage<File>(file));
我目前得到的输出是
Sink : street,city,zip,state
Sink : 3526 HIGH ST,SACRAMENTO,95838,CA
Sink : 51 OMAHA CT,SACRAMENTO,95823,CA
Sink : 2796 BRANCH ST,SACRAMENTO,95815,CA
在拆分器之后添加一个过滤器以删除第一行。
在过滤器之后添加一个转换器,将负载转换为
"street,city,zip,state\n" + payload
我正在尝试使用 Spring 云数据流和 Kafka 流式传输 csv 文件。这个想法是逐行流式传输文件,每行包含 header 。例如,如果我的 csv 文件的内容是
street,city,zip,state
3526 HIGH ST,SACRAMENTO,95838,CA
51 OMAHA CT,SACRAMENTO,95823,CA
2796 BRANCH ST,SACRAMENTO,95815,CA
然后流数据应采用以下格式(所需输出)
street,city,zip,state
3526 HIGH ST,SACRAMENTO,95838,CA
street,city,zip,state
51 OMAHA CT,SACRAMENTO,95823,CA
street,city,zip,state
2796 BRANCH ST,SACRAMENTO,95815,CA
我正在使用 FileSplitter 将消息传递到输出通道。
private Source channels;
FileSplitter splitter = new FileSplitter(true, true);
splitter.setOutputChannel(channels.output());
splitter.handleMessage(new GenericMessage<File>(file));
我目前得到的输出是
Sink : street,city,zip,state
Sink : 3526 HIGH ST,SACRAMENTO,95838,CA
Sink : 51 OMAHA CT,SACRAMENTO,95823,CA
Sink : 2796 BRANCH ST,SACRAMENTO,95815,CA
在拆分器之后添加一个过滤器以删除第一行。
在过滤器之后添加一个转换器,将负载转换为
"street,city,zip,state\n" + payload