如何在两个 PCollection 上追加新行或执行合并

How to append new rows or perform union on tow PCollection

在下面的 CSV 文件中,我需要为其添加新的行值。

ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/03/2021 200
01 30/04/2021 200
01 31/05/2021 500
01 30/06/2021 600

预期输出:

ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/03/2021 200
01 30/04/2021 200
01 31/05/2021 500
01 30/06/2021 600
01 30/07/2021 999

Java代码:

    public static void main(String[] args) throws IOException {
        final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");

        File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");

        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions //PRashanth needs to be looked at
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows

        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
        RowAddition rowAddition = new RowAddition();
        final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
        ;

如何组合这两个PCollection对象?

        PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
        pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));

        pipeline.run().waitUntilFinish();
        System.out.println("The end");
    }
}

添加行的逻辑

class RowAddition extends DoFn<Row, Row> {

    private static final long serialVersionUID = -8093837716944809689L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        org.apache.beam.sdk.schemas.Schema beamSchema=null;
        try {
            beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        Row row = context.element();
        Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
        context.output(newRow);
    }
}

我参考了这个link

https://beam.apache.org/documentation/pipelines/design-your-pipeline/#:~:text=Merging%20PCollections,-Often%2C%20after%20you&text=You%20can%20do%20so%20by,join%20between%20two%20PCollection%20s.

您正在寻找 Flatten 转换。这需要任意数量的现有 PCollection 并生成一个新的 PCollection 及其元素的联合。对于全新的元素,您可以使用 Create 或使用另一个 PTransform 来根据旧元素计算新元素。