使用 DataFlow 的求和和平均聚合

Sum and Average Aggregation using DataFlow

我有以下类型的示例数据。

s.n., time, user, time_span, user_level
1, 2016-01-04T1:26:13, Hari, 8, admin
2, 2016-01-04T11:6:13, Gita, 2, admin
3, 2016-01-04T11:26:13, Gita, 0, user

现在我需要找到 average_time_span/useraverage_time_span/user_leveltotal_time_span/user

我能够找到上述每个值,但无法一次找到所有这些值。由于我是 DataFlow 的新手,请建议我使用合适的方法。

static class ExtractUserAndUserLevelFn extends DoFn<String, KV<String, Long>> {
        @Override
        public void processElement(ProcessContext c) {

            String[] words = c.element().split(",");

            if (words.length == 5) {
                Instant timestamp = Instant.parse(words[1].trim());                    
                KV<String, Long> userTime = KV.of(words[2].trim(), Long.valueOf(words[3].trim()));
                KV<String, Long> userLevelTime = KV.of(words[4].trim(), Long.valueOf(words[3].trim()));                    
                c.outputWithTimestamp(userTime, timestamp);
                c.outputWithTimestamp(userLevelTime, timestamp);

            }
        }
    }


public static void main(String[] args) {
    TestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(TestOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
            .apply(ParDo.of(new ExtractUserAndUserLevelFn()))
            .apply(Window.<KV<String, Long>>into(
                    FixedWindows.of(Duration.standardSeconds(options.getMyWindowSize()))))
            .apply(GroupByKey.<String, Long>create())
            .apply(ParDo.of(new DoFn<KV<String, Iterable<Long>>, KV<String, Long>>() {
                public void processElement(ProcessContext c) {
                    String key = c.element().getKey();
                    Iterable<Long> docsWithThatUrl = c.element().getValue();
                    Long sum = 0L;
                    for (Long item : docsWithThatUrl)
                        sum += item;
                    KV<String, Long> userTime = KV.of(key, sum);
                    c.output(userTime);
                }
            }))
            .apply(MapElements.via(new FormatAsTextFn()))
            .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()).
                    withNumShards(options.getShardsNumber()));

    p.run();
}

Mean and Sum 转换看起来很适合此用例。基本用法如下所示:

 PCollection<KV<String, Double>> meanPerKey =
     input.apply(Mean.<String, Integer>perKey());

 PCollection<KV<String, Integer>> sumPerKey = input
     .apply(Sum.<String>integersPerKey());

一种方法是首先将行解析为一个 PCollection,每行包含一条记录,然后从该集合创建两个 key-value 对的 PCollection。假设您定义了一个代表这样一行的记录:

static class Record implements Serializable {
  final String user;
  final String role;
  final long duration;
  // need a constructor here
}

现在,创建一个从输入行创建记录的 LineToRecordFn,这样您就可以:

PCollection<Record> records = p.apply(TextIO.Read.named("ReadLines")
                               .from(options.getInputFile()))
                               .apply(ParDo.of(new LineToRecordFn()));

如果需要,您可以在此处window。无论您 window 与否,您都可以创建自己的 keyed-by-role 和 keyed-by-user PCollections:

PCollection<KV<String,Long>> role_duration = records.apply(MapElements.via(
    new SimpleFunction<Record,KV<String,Long>>() {
          @Override
          public KV<String,Long> apply(Record r) {
            return KV.of(r.role,r.duration);
          }
        }));

PCollection<KV<String,Long>> user_duration = records.apply(MapElements.via(
    new SimpleFunction<Record,KV<String,Long>>() {
              @Override
              public KV<String,Long> apply(Record r) {
                return KV.of(r.user, r.duration);
              }
            }));

现在,您只需几行就可以得到均值和总和:

PCollection<KV<String,Double>> mean_by_user = user_duration.apply(
    Mean.<String,Long>perKey());
PCollection<KV<String,Double>> mean_by_role = role_duration.apply(
    Mean.<String,Long>perKey()); 
PCollection<KV<String,Long>> sum_by_role = role_duration.apply(
    Sum.<String>longsPerKey());

请注意,数据流在 运行 你的工作之前做了一些优化。因此,虽然看起来您正在对记录 PCollection 执行两次传递,但事实可能并非如此。