Apache Beam 根据前一行的值更新当前行值
Apache Beam update current row values based on the values from previous row
Apache Beam 根据上一行的值更新值
我已将 CSV 文件中的值分组。在分组行中,我们发现一些缺失值需要根据前一行的值进行更新。如果该行的第一列为空,那么我们需要将其更新为 0。
我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?
记录
customerId
date
amount
BS:89481
1/1/2012
100
BS:89482
1/1/2012
BS:89483
1/1/2012
300
BS:89481
1/2/2012
900
BS:89482
1/2/2012
200
BS:89483
1/2/2012
分组记录
customerId
date
amount
BS:89481
1/1/2012
100
BS:89481
1/2/2012
900
BS:89482
1/1/2012
BS:89482
1/2/2012
200
BS:89483
1/1/2012
300
BS:89483
1/2/2012
更新缺失值
customerId
date
amount
BS:89481
1/1/2012
100
BS:89481
1/2/2012
900
BS:89482
1/1/2012
000
BS:89482
1/2/2012
200
BS:89483
1/1/2012
300
BS:89483
1/2/2012
300
到目前为止的代码:
public class GroupByTest {
public static void main(String[] args) throws IOException {
System.out.println("We are about to start!!");
final File schemaFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");
File csvFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Transformation
//Convert row to KV
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("balance", Sum.ofDoubles(), "balances");
final PCollection<Row> aggregagte = rows.apply(combine);
PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
if (logicalType != null) {
type = logicalType.getName();
}
switch (type) {
case "string":
return row.getString(columnName);
case "int":
return Objects.requireNonNull(row.getInt32(columnName)).toString();
case "bigint":
return Objects.requireNonNull(row.getInt64(columnName)).toString();
case "double":
return Objects.requireNonNull(row.getDouble(columnName)).toString();
case "timestamp-millis":
return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
default:
return row.getString(columnName);
}
}
}
修改后的代码:
原码
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofDoubles(), "balances");
按客户 ID 分组
class ToKV extends DoFn<Row, KV<String, Row>> {
private static final long serialVersionUID = -8093837716944809689L;
String columnName1 = null;
@ProcessElement
public void processElement(ProcessContext context) {
Row row = context.element();
org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
context.output(KV.of(row.getValue(columnName1).toString(), row));
}
public void setColumnName1(String columnName1) {
this.columnName1 = columnName1;
}
}
按客户 ID 分组:
ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());
// 尝试按日期分组
PCollection<Row> outputRow =
groupedKVRows
.apply(ParDo.of(new GroupByDate()))
.setCoder(RowCoder.of(AvroUtils.toBeamSchema(schema)));
如何编写将Iterable转换为pCollection的逻辑,以便可以对日期进行排序。
class GroupByDate extends DoFn<KV<String,Iterable<Row>>, Row> {
private static final long serialVersionUID = -1345126662309830332L;
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
Iterable<Row> rows = context.element().getValue();
}
Avro 架构:
{
"type" : "record",
"name" : "Entry",
"namespace" : "transform",
"fields" : [ {
"name" : "customerId",
"type" : [ "string", "null" ]
}, {
"name" : "date",
"type" : [ "string", "null" ],
"logicalType": "date"
}, {
"name" : "amount",
"type" : [ "double", "null" ]
} ]
}
更新以将 PCollection 转换为 Row[]
class KVToRow extends DoFn<KV<String, Iterable<Row>>, Row[]> {
private static final long serialVersionUID = -1345126662309830332L;
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
List<Row> rowList = new ArrayList();
Iterable<Row> rowValue = context.element().getValue();
rowValue.forEach(data -> {
rowList.add(data);
});
Row[] rowArray = new Row[rowList.size()-1];
rowArray=rowList.toArray(rowArray);
context.output(rowArray);
}
}
建议代码
Row[] rowArray = Iterables.toArray(rows, Row.class);
错误:
Iterables 类型中的方法 toArray(Iterable extends T>, Class) 不适用于参数 (PCollection, Class)
将可迭代对象转换为数组
Row[] rowArray = groupedKVRows.apply(ParDo.of(new KVToRow()));
错误:
此行有多个标记
- 类型不匹配:无法从 PCollection 转换
行[]
- 1 行更改,2 行删除
Beam 不提供任何顺序保证,因此您必须像之前那样对它们进行分组。
但据我从你的案例中了解到,你需要按 customerId
分组。之后,您可以应用像 ParDo 这样的 PTransform 按 date
对分组的行进行排序,并根据需要填充缺失值。
通过转换为数组进行排序的示例
static class SortAndForwardFillFn extends DoFn<KV<String, Iterable<Row>>> {
@ProcessElement
public void processElement(@Element KV<String, Iterable<Row>> element, OutputReceiver<KV<String, Iterable<Row>>> outputReceiver) {
// Create a formatter for parsing dates
DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");
// Convert iterable to array
Row[] rowArray = Iterables.toArray(rows, Row.class);
// Sort array using dates
Arrays
.sort(
rowArray,
Comparator
.comparingLong(row -> formatter.parseDateTime(row.getString("date")).getMillis())
);
// Store the last amount
Double lastAmount = 0.0;
// Create a List for storing sorted and filled rows
List<Row> resultRows = new ArrayList<>(rowArray.length);
// Iterate over the array and fill in the missing parts
for (Row row : rowArray) {
// Get current amount
Double currentAmount = row.getDouble("amount");
// If null, fill the previous value and add to results,
// otherwise add as it is
resultRows.add(...);
}
// Output using the output receiver
outputReceiver
.output(
KV.of(element.getKey(), resultRows)
)
);
}
}
Apache Beam 根据上一行的值更新值
我已将 CSV 文件中的值分组。在分组行中,我们发现一些缺失值需要根据前一行的值进行更新。如果该行的第一列为空,那么我们需要将其更新为 0。
我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?
记录
customerId | date | amount |
---|---|---|
BS:89481 | 1/1/2012 | 100 |
BS:89482 | 1/1/2012 | |
BS:89483 | 1/1/2012 | 300 |
BS:89481 | 1/2/2012 | 900 |
BS:89482 | 1/2/2012 | 200 |
BS:89483 | 1/2/2012 |
分组记录
customerId | date | amount |
---|---|---|
BS:89481 | 1/1/2012 | 100 |
BS:89481 | 1/2/2012 | 900 |
BS:89482 | 1/1/2012 | |
BS:89482 | 1/2/2012 | 200 |
BS:89483 | 1/1/2012 | 300 |
BS:89483 | 1/2/2012 |
更新缺失值
customerId | date | amount |
---|---|---|
BS:89481 | 1/1/2012 | 100 |
BS:89481 | 1/2/2012 | 900 |
BS:89482 | 1/1/2012 | 000 |
BS:89482 | 1/2/2012 | 200 |
BS:89483 | 1/1/2012 | 300 |
BS:89483 | 1/2/2012 | 300 |
到目前为止的代码:
public class GroupByTest {
public static void main(String[] args) throws IOException {
System.out.println("We are about to start!!");
final File schemaFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\schema_transform2.avsc");
File csvFile = new File(
"C:\AI\Workspace\office\lombok\artifact\src\main\resources\CustomerRequest-case2.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Transformation
//Convert row to KV
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("balance", Sum.ofDoubles(), "balances");
final PCollection<Row> aggregagte = rows.apply(combine);
PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
if (logicalType != null) {
type = logicalType.getName();
}
switch (type) {
case "string":
return row.getString(columnName);
case "int":
return Objects.requireNonNull(row.getInt32(columnName)).toString();
case "bigint":
return Objects.requireNonNull(row.getInt64(columnName)).toString();
case "double":
return Objects.requireNonNull(row.getDouble(columnName)).toString();
case "timestamp-millis":
return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
default:
return row.getString(columnName);
}
}
}
修改后的代码: 原码
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofDoubles(), "balances");
按客户 ID 分组
class ToKV extends DoFn<Row, KV<String, Row>> {
private static final long serialVersionUID = -8093837716944809689L;
String columnName1 = null;
@ProcessElement
public void processElement(ProcessContext context) {
Row row = context.element();
org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
context.output(KV.of(row.getValue(columnName1).toString(), row));
}
public void setColumnName1(String columnName1) {
this.columnName1 = columnName1;
}
}
按客户 ID 分组:
ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());
// 尝试按日期分组
PCollection<Row> outputRow =
groupedKVRows
.apply(ParDo.of(new GroupByDate()))
.setCoder(RowCoder.of(AvroUtils.toBeamSchema(schema)));
如何编写将Iterable转换为pCollection的逻辑,以便可以对日期进行排序。
class GroupByDate extends DoFn<KV<String,Iterable<Row>>, Row> {
private static final long serialVersionUID = -1345126662309830332L;
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
Iterable<Row> rows = context.element().getValue();
}
Avro 架构:
{
"type" : "record",
"name" : "Entry",
"namespace" : "transform",
"fields" : [ {
"name" : "customerId",
"type" : [ "string", "null" ]
}, {
"name" : "date",
"type" : [ "string", "null" ],
"logicalType": "date"
}, {
"name" : "amount",
"type" : [ "double", "null" ]
} ]
}
更新以将 PCollection 转换为 Row[]
class KVToRow extends DoFn<KV<String, Iterable<Row>>, Row[]> {
private static final long serialVersionUID = -1345126662309830332L;
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
List<Row> rowList = new ArrayList();
Iterable<Row> rowValue = context.element().getValue();
rowValue.forEach(data -> {
rowList.add(data);
});
Row[] rowArray = new Row[rowList.size()-1];
rowArray=rowList.toArray(rowArray);
context.output(rowArray);
}
}
建议代码
Row[] rowArray = Iterables.toArray(rows, Row.class);
错误:
Iterables 类型中的方法 toArray(Iterable extends T>, Class) 不适用于参数 (PCollection, Class)
将可迭代对象转换为数组
Row[] rowArray = groupedKVRows.apply(ParDo.of(new KVToRow()));
错误:
此行有多个标记
- 类型不匹配:无法从 PCollection
Beam 不提供任何顺序保证,因此您必须像之前那样对它们进行分组。
但据我从你的案例中了解到,你需要按 customerId
分组。之后,您可以应用像 ParDo 这样的 PTransform 按 date
对分组的行进行排序,并根据需要填充缺失值。
通过转换为数组进行排序的示例
static class SortAndForwardFillFn extends DoFn<KV<String, Iterable<Row>>> {
@ProcessElement
public void processElement(@Element KV<String, Iterable<Row>> element, OutputReceiver<KV<String, Iterable<Row>>> outputReceiver) {
// Create a formatter for parsing dates
DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");
// Convert iterable to array
Row[] rowArray = Iterables.toArray(rows, Row.class);
// Sort array using dates
Arrays
.sort(
rowArray,
Comparator
.comparingLong(row -> formatter.parseDateTime(row.getString("date")).getMillis())
);
// Store the last amount
Double lastAmount = 0.0;
// Create a List for storing sorted and filled rows
List<Row> resultRows = new ArrayList<>(rowArray.length);
// Iterate over the array and fill in the missing parts
for (Row row : rowArray) {
// Get current amount
Double currentAmount = row.getDouble("amount");
// If null, fill the previous value and add to results,
// otherwise add as it is
resultRows.add(...);
}
// Output using the output receiver
outputReceiver
.output(
KV.of(element.getKey(), resultRows)
)
);
}
}