如何在两个 PCollection 上追加新行或执行合并
How to append new rows or perform union on tow PCollection
在下面的 CSV 文件中,我需要为其添加新的行值。
ID
date
balance
01
31/01/2021
100
01
28/02/2021
200
01
31/03/2021
200
01
30/04/2021
200
01
31/05/2021
500
01
30/06/2021
600
预期输出:
ID
date
balance
01
31/01/2021
100
01
28/02/2021
200
01
31/03/2021
200
01
30/04/2021
200
01
31/05/2021
500
01
30/06/2021
600
01
30/07/2021
999
Java代码:
public static void main(String[] args) throws IOException {
final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");
File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
RowAddition rowAddition = new RowAddition();
final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
;
如何组合这两个PCollection对象?
PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
}
添加行的逻辑
class RowAddition extends DoFn<Row, Row> {
private static final long serialVersionUID = -8093837716944809689L;
@ProcessElement
public void processElement(ProcessContext context) {
org.apache.beam.sdk.schemas.Schema beamSchema=null;
try {
beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Row row = context.element();
Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
context.output(newRow);
}
}
我参考了这个link
您正在寻找 Flatten 转换。这需要任意数量的现有 PCollection 并生成一个新的 PCollection 及其元素的联合。对于全新的元素,您可以使用 Create 或使用另一个 PTransform 来根据旧元素计算新元素。
在下面的 CSV 文件中,我需要为其添加新的行值。
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
预期输出:
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
01 | 30/07/2021 | 999 |
Java代码:
public static void main(String[] args) throws IOException {
final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");
File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
RowAddition rowAddition = new RowAddition();
final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
;
如何组合这两个PCollection对象?
PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
}
添加行的逻辑
class RowAddition extends DoFn<Row, Row> {
private static final long serialVersionUID = -8093837716944809689L;
@ProcessElement
public void processElement(ProcessContext context) {
org.apache.beam.sdk.schemas.Schema beamSchema=null;
try {
beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Row row = context.element();
Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
context.output(newRow);
}
}
我参考了这个link
您正在寻找 Flatten 转换。这需要任意数量的现有 PCollection 并生成一个新的 PCollection 及其元素的联合。对于全新的元素,您可以使用 Create 或使用另一个 PTransform 来根据旧元素计算新元素。