如何读取 apache beam 中的 CSVRecord?
How to read CSVRecord in apache beam?
我有一个Java可迭代对象,可迭代记录。我想将它传递给 Beam 管道。我试过了
PCollection csvRecordPC = p.apply("Create collection", Create.of(记录));
它导致了一个错误
执行 Java class 时发生异常。无法确定没有元素的 'Create' PTransform 的默认编码器。添加元素,调用 Create.empty(Coder)、Create.empty(TypeDescriptor),或者在 PTransform 上调用 'withCoder(Coder)' 或 'withType(TypeDescriptor)'。
我应该使用哪个编码器?或者如何编写自定义编码器?
我找到了使用 FileIO 的解决方案。
p.apply(FileIO.match().filepattern(options.getInputFile()))
.apply(FileIO.readMatches())
.apply(ParDo.of(new CsvParser()))
CsvPaser() 是
public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
@DoFn.ProcessElement
public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
InputStream is = Channels.newInputStream(element.open());
Reader reader = new InputStreamReader(is);
Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);
for (CSVRecord record : records) {
receiver.output(record);
}
}
}
我有一个Java可迭代对象,可迭代记录。我想将它传递给 Beam 管道。我试过了
PCollection csvRecordPC = p.apply("Create collection", Create.of(记录));
它导致了一个错误
执行 Java class 时发生异常。无法确定没有元素的 'Create' PTransform 的默认编码器。添加元素,调用 Create.empty(Coder)、Create.empty(TypeDescriptor),或者在 PTransform 上调用 'withCoder(Coder)' 或 'withType(TypeDescriptor)'。
我应该使用哪个编码器?或者如何编写自定义编码器?
我找到了使用 FileIO 的解决方案。
p.apply(FileIO.match().filepattern(options.getInputFile()))
.apply(FileIO.readMatches())
.apply(ParDo.of(new CsvParser()))
CsvPaser() 是
public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
@DoFn.ProcessElement
public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
InputStream is = Channels.newInputStream(element.open());
Reader reader = new InputStreamReader(is);
Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);
for (CSVRecord record : records) {
receiver.output(record);
}
}
}