如何读取 apache beam 中的 CSVRecord?

How to read CSVRecord in apache beam?

我有一个Java可迭代对象,可迭代记录。我想将它传递给 Beam 管道。我试过了

PCollection csvRecordPC = p.apply("Create collection", Create.of(记录));

它导致了一个错误

执行 Java class 时发生异常。无法确定没有元素的 'Create' PTransform 的默认编码器。添加元素,调用 Create.empty(Coder)、Create.empty(TypeDescriptor),或者在 PTransform 上调用 'withCoder(Coder)' 或 'withType(TypeDescriptor)'。

我应该使用哪个编码器?或者如何编写自定义编码器?

我找到了使用 FileIO 的解决方案。

p.apply(FileIO.match().filepattern(options.getInputFile()))
 .apply(FileIO.readMatches())
 .apply(ParDo.of(new CsvParser())) 

CsvPaser() 是

public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
    @DoFn.ProcessElement
    public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
        InputStream is = Channels.newInputStream(element.open());

        Reader reader = new InputStreamReader(is);

        Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);

        for (CSVRecord record : records) {
            receiver.output(record);
        }
    }
}