Apache Beam - 从开始到记录时间戳的聚合日期

Apache Beam - Aggregate date from beginning to logged timestamps

我正在尝试为流式处理实现 apache beam,我想在其中计算每个已注册时间戳的项目的 min()、max() 值。

例如:

Timestamp item_count
2021-08-03 01:00:03.22333 UTC 5
2021-08-03 01:00:03.256427 UTC 4
2021-08-03 01:00:03.256497 UTC 7
2021-08-03 01:00:03.256499 UTC 2

输出:

Timestamp Min Max
2021-08-03 01:00:03.22333 UTC 5 5
2021-08-03 01:00:03.256427 UTC 4 5
2021-08-03 01:00:03.256497 UTC 4 7
2021-08-03 01:00:03.256499 UTC 2 7

我无法弄清楚如何使我的用例适应窗口化,因为对我而言,框架从第 1 行开始,到我正在阅读的每一个新内容结束。 我应该如何处理这个问题有什么建议吗?

谢谢

这不会是 100% 完美的,因为总会有一些延迟,您可能会以错误的顺序获取元素,但应该足够好了。

    public interface RollingMinMaxOptions extends PipelineOptions {
        @Description("Topic to read from")
        @Default.String("projects/pubsub-public-data/topics/taxirides-realtime")
        String getTopic();

        void setTopic(String value);
    }

    public static class MinMax extends Combine.CombineFn<Float, KV<Float, Float>, KV<Float, Float>> { //Types: Input, Accum, Output
        @Override
        public KV<Float, Float> createAccumulator() {
            KV<Float, Float> start = KV.of(Float.POSITIVE_INFINITY, 0f);
            return start;
        }

        @Override
        public KV<Float, Float> addInput(KV<Float, Float> accumulator, Float input) {
            Float max = Math.max(accumulator.getValue(), input);
            Float min = Math.min(accumulator.getKey(), input);
            return KV.of(min, max);
        }

        @Override
        public KV<Float, Float> mergeAccumulators(Iterable<KV<Float, Float>> accumulators) {
            Float max = 0f;
            Float min = Float.POSITIVE_INFINITY;
            for (KV<Float, Float> kv : accumulators) {
                max = Math.max(kv.getValue(), max);
                min = Math.min(kv.getKey(), min);
            }
            return KV.of(min, max);
        }

        @Override
        public KV<Float, Float> extractOutput(KV<Float, Float> accumulator) {
            return accumulator;

        }
    }

    public static void main(String[] args) {
        RollingMinMaxOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RollingMinMaxOptions.class);

        Pipeline p = Pipeline.create(options);

        p
                .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
                .apply("Get meter reading", ParDo.of(new DoFn<String, Float>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) throws ParseException {
                                JSONObject json = new JSONObject(c.element());

                                String rideStatus = json.getString("ride_status");
                                Float meterReading = json.getFloat("meter_reading");

                                if (rideStatus.equals("dropoff") && meterReading > 0){
                                    c.output(meterReading);
                                }
                            }
                        })
                )
                .apply(Window.<Float>into(
                        new GlobalWindows())
                        .triggering(Repeatedly.forever(
                                AfterPane.elementCountAtLeast(1)
                            )
                        )
                        .withTimestampCombiner(TimestampCombiner.LATEST)
                        .accumulatingFiredPanes()
                )
                .apply(Combine.globally(new MinMax()))
                .apply("Format", ParDo.of(new DoFn<KV<Float, Float>, TableRow>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws ParseException {
                        TableRow row = new TableRow();

                        row.set("min", c.element().getKey());
                        row.set("max", c.element().getValue());
                        row.set("timestamp", c.timestamp().toString());

                        LOG.info(row.toString());
                        c.output(row);
                    }
                })
        );

        p.run();
    }

如果您希望最小值/最大值每 X 次重置一次,请将其更改为该大小的 FixedWindow