Apache Beam - 从开始到记录时间戳的聚合日期
Apache Beam - Aggregate date from beginning to logged timestamps
我正在尝试为流式处理实现 apache beam,我想在其中计算每个已注册时间戳的项目的 min()、max() 值。
例如:
Timestamp
item_count
2021-08-03 01:00:03.22333 UTC
5
2021-08-03 01:00:03.256427 UTC
4
2021-08-03 01:00:03.256497 UTC
7
2021-08-03 01:00:03.256499 UTC
2
输出:
Timestamp
Min
Max
2021-08-03 01:00:03.22333 UTC
5
5
2021-08-03 01:00:03.256427 UTC
4
5
2021-08-03 01:00:03.256497 UTC
4
7
2021-08-03 01:00:03.256499 UTC
2
7
我无法弄清楚如何使我的用例适应窗口化,因为对我而言,框架从第 1 行开始,到我正在阅读的每一个新内容结束。
我应该如何处理这个问题有什么建议吗?
谢谢
这不会是 100% 完美的,因为总会有一些延迟,您可能会以错误的顺序获取元素,但应该足够好了。
public interface RollingMinMaxOptions extends PipelineOptions {
@Description("Topic to read from")
@Default.String("projects/pubsub-public-data/topics/taxirides-realtime")
String getTopic();
void setTopic(String value);
}
public static class MinMax extends Combine.CombineFn<Float, KV<Float, Float>, KV<Float, Float>> { //Types: Input, Accum, Output
@Override
public KV<Float, Float> createAccumulator() {
KV<Float, Float> start = KV.of(Float.POSITIVE_INFINITY, 0f);
return start;
}
@Override
public KV<Float, Float> addInput(KV<Float, Float> accumulator, Float input) {
Float max = Math.max(accumulator.getValue(), input);
Float min = Math.min(accumulator.getKey(), input);
return KV.of(min, max);
}
@Override
public KV<Float, Float> mergeAccumulators(Iterable<KV<Float, Float>> accumulators) {
Float max = 0f;
Float min = Float.POSITIVE_INFINITY;
for (KV<Float, Float> kv : accumulators) {
max = Math.max(kv.getValue(), max);
min = Math.min(kv.getKey(), min);
}
return KV.of(min, max);
}
@Override
public KV<Float, Float> extractOutput(KV<Float, Float> accumulator) {
return accumulator;
}
}
public static void main(String[] args) {
RollingMinMaxOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RollingMinMaxOptions.class);
Pipeline p = Pipeline.create(options);
p
.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Get meter reading", ParDo.of(new DoFn<String, Float>() {
@ProcessElement
public void processElement(ProcessContext c) throws ParseException {
JSONObject json = new JSONObject(c.element());
String rideStatus = json.getString("ride_status");
Float meterReading = json.getFloat("meter_reading");
if (rideStatus.equals("dropoff") && meterReading > 0){
c.output(meterReading);
}
}
})
)
.apply(Window.<Float>into(
new GlobalWindows())
.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)
)
)
.withTimestampCombiner(TimestampCombiner.LATEST)
.accumulatingFiredPanes()
)
.apply(Combine.globally(new MinMax()))
.apply("Format", ParDo.of(new DoFn<KV<Float, Float>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws ParseException {
TableRow row = new TableRow();
row.set("min", c.element().getKey());
row.set("max", c.element().getValue());
row.set("timestamp", c.timestamp().toString());
LOG.info(row.toString());
c.output(row);
}
})
);
p.run();
}
如果您希望最小值/最大值每 X 次重置一次,请将其更改为该大小的 FixedWindow
我正在尝试为流式处理实现 apache beam,我想在其中计算每个已注册时间戳的项目的 min()、max() 值。
例如:
Timestamp | item_count |
---|---|
2021-08-03 01:00:03.22333 UTC | 5 |
2021-08-03 01:00:03.256427 UTC | 4 |
2021-08-03 01:00:03.256497 UTC | 7 |
2021-08-03 01:00:03.256499 UTC | 2 |
输出:
Timestamp | Min | Max |
---|---|---|
2021-08-03 01:00:03.22333 UTC | 5 | 5 |
2021-08-03 01:00:03.256427 UTC | 4 | 5 |
2021-08-03 01:00:03.256497 UTC | 4 | 7 |
2021-08-03 01:00:03.256499 UTC | 2 | 7 |
我无法弄清楚如何使我的用例适应窗口化,因为对我而言,框架从第 1 行开始,到我正在阅读的每一个新内容结束。 我应该如何处理这个问题有什么建议吗?
谢谢
这不会是 100% 完美的,因为总会有一些延迟,您可能会以错误的顺序获取元素,但应该足够好了。
public interface RollingMinMaxOptions extends PipelineOptions {
@Description("Topic to read from")
@Default.String("projects/pubsub-public-data/topics/taxirides-realtime")
String getTopic();
void setTopic(String value);
}
public static class MinMax extends Combine.CombineFn<Float, KV<Float, Float>, KV<Float, Float>> { //Types: Input, Accum, Output
@Override
public KV<Float, Float> createAccumulator() {
KV<Float, Float> start = KV.of(Float.POSITIVE_INFINITY, 0f);
return start;
}
@Override
public KV<Float, Float> addInput(KV<Float, Float> accumulator, Float input) {
Float max = Math.max(accumulator.getValue(), input);
Float min = Math.min(accumulator.getKey(), input);
return KV.of(min, max);
}
@Override
public KV<Float, Float> mergeAccumulators(Iterable<KV<Float, Float>> accumulators) {
Float max = 0f;
Float min = Float.POSITIVE_INFINITY;
for (KV<Float, Float> kv : accumulators) {
max = Math.max(kv.getValue(), max);
min = Math.min(kv.getKey(), min);
}
return KV.of(min, max);
}
@Override
public KV<Float, Float> extractOutput(KV<Float, Float> accumulator) {
return accumulator;
}
}
public static void main(String[] args) {
RollingMinMaxOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RollingMinMaxOptions.class);
Pipeline p = Pipeline.create(options);
p
.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Get meter reading", ParDo.of(new DoFn<String, Float>() {
@ProcessElement
public void processElement(ProcessContext c) throws ParseException {
JSONObject json = new JSONObject(c.element());
String rideStatus = json.getString("ride_status");
Float meterReading = json.getFloat("meter_reading");
if (rideStatus.equals("dropoff") && meterReading > 0){
c.output(meterReading);
}
}
})
)
.apply(Window.<Float>into(
new GlobalWindows())
.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)
)
)
.withTimestampCombiner(TimestampCombiner.LATEST)
.accumulatingFiredPanes()
)
.apply(Combine.globally(new MinMax()))
.apply("Format", ParDo.of(new DoFn<KV<Float, Float>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws ParseException {
TableRow row = new TableRow();
row.set("min", c.element().getKey());
row.set("max", c.element().getValue());
row.set("timestamp", c.timestamp().toString());
LOG.info(row.toString());
c.output(row);
}
})
);
p.run();
}
如果您希望最小值/最大值每 X 次重置一次,请将其更改为该大小的 FixedWindow