Apache Beam 根据日期对列进行重采样
Apache Beam Resampling of columns based on date
我正在使用 ApacheBeam 处理数据并尝试实现以下目标。
- 从 CSV 文件读取数据。 (已完成)
- 根据客户 ID 对记录进行分组(已完成)
- 根据月份对数据重新采样并计算该特定月份的总和。
详细说明:
我有一个 CSV 文件,如下所示。
customerId
date
amount
BS:89481
11/14/2012
124
BS:89480
11/14/2012
234
BS:89481
11/10/2012
189
BS:89480
11/02/2012
987
BS:89481
09/14/2012
784
BS:89480
11/14/2012
056
中期:
按 customerId 分组并按日期排序
customerId
date
amount
BS:89481
09/14/2012
784
BS:89481
11/10/2012
189
BS:89481
11/14/2012
124
BS:89480
11/02/2012
987
BS:89480
11/14/2012
234
BS:89480
11/14/2012
056
预期输出(重采样)
在这里,我们计算单个客户在该特定月份的所有金额的总和。例如:客户 BS:89481 在 11 月份有两次支出,因此我们计算了该月的总和 (124 + 189)。
customerId
date
amount
BS:89481
09/30/2012
784
BS:89481
11/30/2012
313
BS:89480
11/02/2012
1277
我能够完成第 1 步和第 2 步,但不确定如何实施第 3 步。
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Step2
//Convert row to KV for grouping
StringToKV stringtoKV = new StringToKV();
stringtoKV.setColumnName("customerId");
PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
//setCoder(KvCoder.of(VoidCoder.of()), rows.getCoder()));
// Obtain a PCollection of KeyValue class of
PCollection<KV<String,Iterable<Row>>> kvIterableForm = kvOrderRows.apply(GroupByKey.<String,Row>create());
更新:
模式转换:
{
"type" : "record",
"name" : "Entry",
"namespace" : "transform",
"fields" : [ {
"name" : "customerId",
"type" : [ "string", "null" ]
}, {
"name" : "date",
"type" : [ "long", "null" ]
}, {
"name" : "amount",
"type" : [ "double", "null" ]
}]
}
CSV 文件
customerId
date
amount
BS:89481
11/14/2012
124
BS:89480
11/14/2012
234
BS:89481
11/10/2012
189
BS:89480
11/02/2012
987
BS:89481
09/14/2012
784
BS:89480
11/14/2012
056
class StringToKV1 extends DoFn<Row, KV<String, Row>> {
private static final long serialVersionUID = -8093837716944809689L;
String columnName=null;
@ProcessElement
public void processElement(ProcessContext context) {
Row row = context.element();
context.output(KV.of(row.getValue(columnName), row));
}
public void setColumnName(String columnName) {
this.columnName = columnName;
}
}
代码:
public class GroupByTest {
public static void main(String[] args) throws IOException {
System.out.println("We are about to start!!");
final File schemaFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");
File csvFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Transformation
//Convert row to KV
StringToKV1 stringtoKV1 = new StringToKV1();
stringtoKV1.setColumnName("customerId");
PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV1)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
// Will throw error
// rows.apply(Group.byFieldNames("customerId", "date").aggregateField("amount", Sum.ofIntegers(), //"totalAmount"));
System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "+Group.byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofIntegers(), "totalAmount").getName());
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
if (logicalType != null) {
type = logicalType.getName();
}
switch (type) {
case "string":
return row.getString(columnName);
case "int":
return Objects.requireNonNull(row.getInt32(columnName)).toString();
case "bigint":
return Objects.requireNonNull(row.getInt64(columnName)).toString();
case "double":
return Objects.requireNonNull(row.getDouble(columnName)).toString();
case "timestamp-millis":
return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
default:
return row.getString(columnName);
}
}
}
更正代码:
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofDoubles(), "sumAmount");
final PCollection<Row> aggregagte = rows.apply(combine);
PCollection<String> pOutput = aggregagte.apply(ParDo.of(new RowToString()));
我得到的输出是
预期输出
customerId
date
amount
BS:89481
09/30/2012
784
BS:89481
11/30/2012
313
BS:89480
11/30/2012
1277
因为您已经有 PCollection
个 Row
,那么您可以使用 Schema-aware PTransforms。对于您的情况,您可能想使用“分组聚合”,它可以是这样的:
Group.byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofIntegers(), "totalAmount")
它将按客户 ID 和日期对行进行分组,然后计算每天的总金额。如果你想按月计算,那么你需要创建一个新列(或修改当前列),其中只有一个月份格式的日期,并按 id 和此列对其进行分组。
此外,使用 Selected.flattenedSchema
来展平输出架构可能很有用。 Beam Schema API 允许以非常简单有效的方式使用模式感知 PCollections。
另一种选择是使用 KV
s 手动实现您的 GroupBy/Aggregate 逻辑,但它更加复杂且容易出错,因为它需要更多的样板代码。
感谢@Alexey Romanenko 提供的帮助
最终解决方案是:
public class GroupByTest {
public static void main(String[] args) throws IOException {
System.out.println("We are about to start!!");
final File schemaFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");
File csvFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Transformation
//Convert row to KV
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("balance", Sum.ofDoubles(), "balances");
final PCollection<Row> aggregagte = rows.apply(combine);
PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
if (logicalType != null) {
type = logicalType.getName();
}
switch (type) {
case "string":
return row.getString(columnName);
case "int":
return Objects.requireNonNull(row.getInt32(columnName)).toString();
case "bigint":
return Objects.requireNonNull(row.getInt64(columnName)).toString();
case "double":
return Objects.requireNonNull(row.getDouble(columnName)).toString();
case "timestamp-millis":
return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
default:
return row.getString(columnName);
}
}
}
我正在使用 ApacheBeam 处理数据并尝试实现以下目标。
- 从 CSV 文件读取数据。 (已完成)
- 根据客户 ID 对记录进行分组(已完成)
- 根据月份对数据重新采样并计算该特定月份的总和。
详细说明:
我有一个 CSV 文件,如下所示。
customerId | date | amount |
---|---|---|
BS:89481 | 11/14/2012 | 124 |
BS:89480 | 11/14/2012 | 234 |
BS:89481 | 11/10/2012 | 189 |
BS:89480 | 11/02/2012 | 987 |
BS:89481 | 09/14/2012 | 784 |
BS:89480 | 11/14/2012 | 056 |
中期: 按 customerId 分组并按日期排序
customerId | date | amount |
---|---|---|
BS:89481 | 09/14/2012 | 784 |
BS:89481 | 11/10/2012 | 189 |
BS:89481 | 11/14/2012 | 124 |
BS:89480 | 11/02/2012 | 987 |
BS:89480 | 11/14/2012 | 234 |
BS:89480 | 11/14/2012 | 056 |
预期输出(重采样) 在这里,我们计算单个客户在该特定月份的所有金额的总和。例如:客户 BS:89481 在 11 月份有两次支出,因此我们计算了该月的总和 (124 + 189)。
customerId | date | amount |
---|---|---|
BS:89481 | 09/30/2012 | 784 |
BS:89481 | 11/30/2012 | 313 |
BS:89480 | 11/02/2012 | 1277 |
我能够完成第 1 步和第 2 步,但不确定如何实施第 3 步。
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Step2
//Convert row to KV for grouping
StringToKV stringtoKV = new StringToKV();
stringtoKV.setColumnName("customerId");
PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
//setCoder(KvCoder.of(VoidCoder.of()), rows.getCoder()));
// Obtain a PCollection of KeyValue class of
PCollection<KV<String,Iterable<Row>>> kvIterableForm = kvOrderRows.apply(GroupByKey.<String,Row>create());
更新:
模式转换:
{
"type" : "record",
"name" : "Entry",
"namespace" : "transform",
"fields" : [ {
"name" : "customerId",
"type" : [ "string", "null" ]
}, {
"name" : "date",
"type" : [ "long", "null" ]
}, {
"name" : "amount",
"type" : [ "double", "null" ]
}]
}
CSV 文件
customerId | date | amount |
---|---|---|
BS:89481 | 11/14/2012 | 124 |
BS:89480 | 11/14/2012 | 234 |
BS:89481 | 11/10/2012 | 189 |
BS:89480 | 11/02/2012 | 987 |
BS:89481 | 09/14/2012 | 784 |
BS:89480 | 11/14/2012 | 056 |
class StringToKV1 extends DoFn<Row, KV<String, Row>> {
private static final long serialVersionUID = -8093837716944809689L;
String columnName=null;
@ProcessElement
public void processElement(ProcessContext context) {
Row row = context.element();
context.output(KV.of(row.getValue(columnName), row));
}
public void setColumnName(String columnName) {
this.columnName = columnName;
}
}
代码:
public class GroupByTest {
public static void main(String[] args) throws IOException {
System.out.println("We are about to start!!");
final File schemaFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");
File csvFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Transformation
//Convert row to KV
StringToKV1 stringtoKV1 = new StringToKV1();
stringtoKV1.setColumnName("customerId");
PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV1)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
// Will throw error
// rows.apply(Group.byFieldNames("customerId", "date").aggregateField("amount", Sum.ofIntegers(), //"totalAmount"));
System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ "+Group.byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofIntegers(), "totalAmount").getName());
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
if (logicalType != null) {
type = logicalType.getName();
}
switch (type) {
case "string":
return row.getString(columnName);
case "int":
return Objects.requireNonNull(row.getInt32(columnName)).toString();
case "bigint":
return Objects.requireNonNull(row.getInt64(columnName)).toString();
case "double":
return Objects.requireNonNull(row.getDouble(columnName)).toString();
case "timestamp-millis":
return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
default:
return row.getString(columnName);
}
}
}
更正代码:
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofDoubles(), "sumAmount");
final PCollection<Row> aggregagte = rows.apply(combine);
PCollection<String> pOutput = aggregagte.apply(ParDo.of(new RowToString()));
我得到的输出是
预期输出
customerId | date | amount |
---|---|---|
BS:89481 | 09/30/2012 | 784 |
BS:89481 | 11/30/2012 | 313 |
BS:89480 | 11/30/2012 | 1277 |
因为您已经有 PCollection
个 Row
,那么您可以使用 Schema-aware PTransforms。对于您的情况,您可能想使用“分组聚合”,它可以是这样的:
Group.byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofIntegers(), "totalAmount")
它将按客户 ID 和日期对行进行分组,然后计算每天的总金额。如果你想按月计算,那么你需要创建一个新列(或修改当前列),其中只有一个月份格式的日期,并按 id 和此列对其进行分组。
此外,使用 Selected.flattenedSchema
来展平输出架构可能很有用。 Beam Schema API 允许以非常简单有效的方式使用模式感知 PCollections。
另一种选择是使用 KV
s 手动实现您的 GroupBy/Aggregate 逻辑,但它更加复杂且容易出错,因为它需要更多的样板代码。
感谢@Alexey Romanenko 提供的帮助
最终解决方案是:
public class GroupByTest {
public static void main(String[] args) throws IOException {
System.out.println("We are about to start!!");
final File schemaFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");
File csvFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Transformation
//Convert row to KV
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("balance", Sum.ofDoubles(), "balances");
final PCollection<Row> aggregagte = rows.apply(combine);
PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
if (logicalType != null) {
type = logicalType.getName();
}
switch (type) {
case "string":
return row.getString(columnName);
case "int":
return Objects.requireNonNull(row.getInt32(columnName)).toString();
case "bigint":
return Objects.requireNonNull(row.getInt64(columnName)).toString();
case "double":
return Objects.requireNonNull(row.getDouble(columnName)).toString();
case "timestamp-millis":
return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
default:
return row.getString(columnName);
}
}
}