Apache Beam 根据日期对列进行重采样

Apache Beam Resampling of columns based on date

我正在使用 ApacheBeam 处理数据并尝试实现以下目标。

  1. 从 CSV 文件读取数据。 (已完成)
  2. 根据客户 ID 对记录进行分组(已完成)
  3. 根据月份对数据重新采样并计算该特定月份的总和。

详细说明:

我有一个 CSV 文件,如下所示。

customerId date amount
BS:89481 11/14/2012 124
BS:89480 11/14/2012 234
BS:89481 11/10/2012 189
BS:89480 11/02/2012 987
BS:89481 09/14/2012 784
BS:89480 11/14/2012 056

中期: 按 customerId 分组并按日期排序

customerId date amount
BS:89481 09/14/2012 784
BS:89481 11/10/2012 189
BS:89481 11/14/2012 124
BS:89480 11/02/2012 987
BS:89480 11/14/2012 234
BS:89480 11/14/2012 056

预期输出(重采样) 在这里,我们计算单个客户在该特定月份的所有金额的总和。例如:客户 BS:89481 在 11 月份有两次支出,因此我们计算了该月的总和 (124 + 189)。

customerId date amount
BS:89481 09/30/2012 784
BS:89481 11/30/2012 313
BS:89480 11/02/2012 1277

我能够完成第 1 步和第 2 步,但不确定如何实施第 3 步。

Schema schema = new Schema.Parser().parse(schemaFile);

    Pipeline pipeline = Pipeline.create();

    // Reading schema
    org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

    final PCollectionTuple tuples = pipeline

            // Reading csv input
            .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

            // Reading files that matches conditions //PRashanth needs to be looked at
            .apply("2", FileIO.readMatches())

            // Reading schema and validating with schema and converts to row and returns
            // valid and invalid list
            .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                    TupleTagList.of(invalidTag())));

    // Fetching only valid rows

    final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

    // Step2
    //Convert row to KV for grouping
    StringToKV stringtoKV = new StringToKV();
    stringtoKV.setColumnName("customerId");
    PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
            //setCoder(KvCoder.of(VoidCoder.of()), rows.getCoder()));

    
    // Obtain a PCollection of KeyValue class of 
    PCollection<KV<String,Iterable<Row>>> kvIterableForm = kvOrderRows.apply(GroupByKey.<String,Row>create());

更新:

模式转换:

    {
      "type" : "record",
      "name" : "Entry",
      "namespace" : "transform",
      "fields" : [  {
        "name" : "customerId",
        "type" : [ "string", "null" ]
      }, {
        "name" : "date",
        "type" : [ "long", "null" ]
      }, {
        "name" : "amount",
        "type" : [ "double", "null" ]
      }]
    }

CSV 文件

customerId date amount
BS:89481 11/14/2012 124
BS:89480 11/14/2012 234
BS:89481 11/10/2012 189
BS:89480 11/02/2012 987
BS:89481 09/14/2012 784
BS:89480 11/14/2012 056
class StringToKV1 extends DoFn<Row, KV<String, Row>> {

        private static final long serialVersionUID = -8093837716944809689L;
        String columnName=null;

        @ProcessElement
        public void processElement(ProcessContext context) {
            Row row = context.element();
            context.output(KV.of(row.getValue(columnName), row));
        }
        
        public void setColumnName(String columnName) {
            this.columnName = columnName;
        }
    }

代码:

        public class GroupByTest {
            public static void main(String[] args) throws IOException {
                System.out.println("We are about to start!!");

                final File schemaFile = new File(
                        "C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");

                File csvFile = new File(
                        "C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
                Schema schema = new Schema.Parser().parse(schemaFile);

                Pipeline pipeline = Pipeline.create();

                // Reading schema
                org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

                final PCollectionTuple tuples = pipeline

                        // Reading csv input
                        .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                        // Reading files that matches conditions //PRashanth needs to be looked at
                        .apply("2", FileIO.readMatches())

                        // Reading schema and validating with schema and converts to row and returns
                        // valid and invalid list
                        .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                                TupleTagList.of(invalidTag())));

                // Fetching only valid rows
                final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

                // Transformation
                //Convert row to KV
                StringToKV1 stringtoKV1 = new StringToKV1();
                stringtoKV1.setColumnName("customerId");
                PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV1)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));

                
                // Will throw error
                // rows.apply(Group.byFieldNames("customerId", "date").aggregateField("amount", Sum.ofIntegers(), //"totalAmount"));
                System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@  "+Group.byFieldNames("customerId", "date")
                 .aggregateField("amount", Sum.ofIntegers(), "totalAmount").getName());                 
                
                pipeline.run().waitUntilFinish();
                System.out.println("The end");

            }

            private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
                String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
                LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
                if (logicalType != null) {
                    type = logicalType.getName();
                }

                switch (type) {
                case "string":
                    return row.getString(columnName);
                case "int":
                    return Objects.requireNonNull(row.getInt32(columnName)).toString();
                case "bigint":
                    return Objects.requireNonNull(row.getInt64(columnName)).toString();
                case "double":
                    return Objects.requireNonNull(row.getDouble(columnName)).toString();
                case "timestamp-millis":
                    return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

                default:
                    return row.getString(columnName);

                }
            }



        }

更正代码:

        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
                .aggregateField("amount", Sum.ofDoubles(), "sumAmount");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput = aggregagte.apply(ParDo.of(new RowToString()));

我得到的输出是

预期输出

customerId date amount
BS:89481 09/30/2012 784
BS:89481 11/30/2012 313
BS:89480 11/30/2012 1277

因为您已经有 PCollectionRow,那么您可以使用 Schema-aware PTransforms。对于您的情况,您可能想使用“分组聚合”,它可以是这样的:

Group.byFieldNames("customerId", "date")
     .aggregateField("amount", Sum.ofIntegers(), "totalAmount")

它将按客户 ID 和日期对行进行分组,然后计算每天的总金额。如果你想按月计算,那么你需要创建一个新列(或修改当前列),其中只有一个月份格式的日期,并按 id 和此列对其进行分组。

此外,使用 Selected.flattenedSchema 来展平输出架构可能很有用。 Beam Schema API 允许以非常简单有效的方式使用模式感知 PCollections。

另一种选择是使用 KVs 手动实现您的 GroupBy/Aggregate 逻辑,但它更加复杂且容易出错,因为它需要更多的样板代码。

感谢@Alexey Romanenko 提供的帮助

最终解决方案是:

public class GroupByTest {
    public static void main(String[] args) throws IOException {
        System.out.println("We are about to start!!");

        final File schemaFile = new File(
                "C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");

        File csvFile = new File(
                "C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions //PRashanth needs to be looked at
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows
        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

        // Transformation
        //Convert row to KV
        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
            .aggregateField("balance", Sum.ofDoubles(), "balances");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
        
                        
        
        pipeline.run().waitUntilFinish();
        System.out.println("The end");

    }

    private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
        String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
        LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
        if (logicalType != null) {
            type = logicalType.getName();
        }

        switch (type) {
        case "string":
            return row.getString(columnName);
        case "int":
            return Objects.requireNonNull(row.getInt32(columnName)).toString();
        case "bigint":
            return Objects.requireNonNull(row.getInt64(columnName)).toString();
        case "double":
            return Objects.requireNonNull(row.getDouble(columnName)).toString();
        case "timestamp-millis":
            return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

        default:
            return row.getString(columnName);

        }
    }



}