Apache Beam 根据前一行的值更新当前行值

Apache Beam update current row values based on the values from previous row

Apache Beam 根据上一行的值更新值

我已将 CSV 文件中的值分组。在分组行中,我们发现一些缺失值需要根据前一行的值进行更新。如果该行的第一列为空,那么我们需要将其更新为 0。

我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?

记录

customerId date amount
BS:89481 1/1/2012 100
BS:89482 1/1/2012
BS:89483 1/1/2012 300
BS:89481 1/2/2012 900
BS:89482 1/2/2012 200
BS:89483 1/2/2012

分组记录

customerId date amount
BS:89481 1/1/2012 100
BS:89481 1/2/2012 900
BS:89482 1/1/2012
BS:89482 1/2/2012 200
BS:89483 1/1/2012 300
BS:89483 1/2/2012

更新缺失值

customerId date amount
BS:89481 1/1/2012 100
BS:89481 1/2/2012 900
BS:89482 1/1/2012 000
BS:89482 1/2/2012 200
BS:89483 1/1/2012 300
BS:89483 1/2/2012 300

到目前为止的代码:

public class GroupByTest {
    public static void main(String[] args) throws IOException {
        System.out.println("We are about to start!!");

        final File schemaFile = new File(
                "C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");

        File csvFile = new File(
                "C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions 
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows
        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

        // Transformation
        //Convert row to KV
        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
            .aggregateField("balance", Sum.ofDoubles(), "balances");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
        
                        
        
        pipeline.run().waitUntilFinish();
        System.out.println("The end");

    }

    private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
        String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
        LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
        if (logicalType != null) {
            type = logicalType.getName();
        }

        switch (type) {
        case "string":
            return row.getString(columnName);
        case "int":
            return Objects.requireNonNull(row.getInt32(columnName)).toString();
        case "bigint":
            return Objects.requireNonNull(row.getInt64(columnName)).toString();
        case "double":
            return Objects.requireNonNull(row.getDouble(columnName)).toString();
        case "timestamp-millis":
            return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

        default:
            return row.getString(columnName);

        }
    }



}

修改后的代码: 原码

final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
        .aggregateField("amount", Sum.ofDoubles(), "balances");

按客户 ID 分组

class ToKV extends DoFn<Row, KV<String, Row>> {

    private static final long serialVersionUID = -8093837716944809689L;
    String columnName1 = null;

    @ProcessElement
    public void processElement(ProcessContext context) {
        Row row = context.element();
        org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
        context.output(KV.of(row.getValue(columnName1).toString(), row));
    }

    public void setColumnName1(String columnName1) {
        this.columnName1 = columnName1;
    }


}

按客户 ID 分组:

ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
    
    
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());

// 尝试按日期分组

    PCollection<Row> outputRow = 
            groupedKVRows
            .apply(ParDo.of(new GroupByDate()))
            .setCoder(RowCoder.of(AvroUtils.toBeamSchema(schema)));

如何编写将Iterable转换为pCollection的逻辑,以便可以对日期进行排序。

class GroupByDate extends DoFn<KV<String,Iterable<Row>>, Row> {

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        String strKey = context.element().getKey();
        Iterable<Row> rows = context.element().getValue();
        
    
        
        
    }

Avro 架构:

{
  "type" : "record",
  "name" : "Entry",
  "namespace" : "transform",
  "fields" : [  {
    "name" : "customerId",
    "type" : [ "string", "null" ]
  }, {
    "name" : "date",
    "type" : [ "string", "null" ],
    "logicalType": "date"
    
  }, {
    "name" : "amount",
    "type" : [ "double", "null" ]
  } ]
}

更新以将 PCollection 转换为 Row[]

class KVToRow extends DoFn<KV<String, Iterable<Row>>, Row[]> {

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        String strKey = context.element().getKey();
        List<Row> rowList = new ArrayList();
        Iterable<Row> rowValue = context.element().getValue();
        rowValue.forEach(data -> {
            rowList.add(data);

        });
        Row[] rowArray = new Row[rowList.size()-1];
        rowArray=rowList.toArray(rowArray);
        context.output(rowArray);
    }
}

建议代码

Row[] rowArray = Iterables.toArray(rows, Row.class);

错误:

Iterables 类型中的方法 toArray(Iterable, Class) 不适用于参数 (PCollection, Class)

将可迭代对象转换为数组

Row[] rowArray =  groupedKVRows.apply(ParDo.of(new KVToRow()));

错误:

此行有多个标记 - 类型不匹配:无法从 PCollection 转换 行[] - 1 行更改,2 行删除

Beam 不提供任何顺序保证,因此您必须像之前那样对它们进行分组。

但据我从你的案例中了解到,你需要按 customerId 分组。之后,您可以应用像 ParDo 这样的 PTransform 按 date 对分组的行进行排序,并根据需要填充缺失值。

通过转换为数组进行排序的示例

static class SortAndForwardFillFn extends DoFn<KV<String, Iterable<Row>>> {

    @ProcessElement
    public void processElement(@Element KV<String, Iterable<Row>> element, OutputReceiver<KV<String, Iterable<Row>>> outputReceiver) {

        // Create a formatter for parsing dates
        DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");

        // Convert iterable to array
        Row[] rowArray = Iterables.toArray(rows, Row.class);

        // Sort array using dates
        Arrays
            .sort(
                rowArray,
                Comparator
                .comparingLong(row -> formatter.parseDateTime(row.getString("date")).getMillis())
        );

        // Store the last amount
        Double lastAmount = 0.0;

        // Create a List for storing sorted and filled rows
        List<Row> resultRows = new ArrayList<>(rowArray.length);

        // Iterate over the array and fill in the missing parts
        for (Row row : rowArray) {

            // Get current amount
            Double currentAmount = row.getDouble("amount");

            // If null, fill the previous value and add to results, 
            // otherwise add as it is
            resultRows.add(...);
        }

        // Output using the output receiver
        outputReceiver
            .output(
                KV.of(element.getKey(), resultRows)
            )
        );
    }
}