应用 GroupBy,然后在 Google 数据流中应用 Count

Applying GroupBy and then apply Count in Google Dataflow

我的 Google 云存储中有以下内容

Advertiser | Event
__________________
100 | Click

101 | Impression

100 | Impression

100 | Impression

101 | Impression

我的管道输出应该是这样的

Advertiser | Clicks | Impressions

100 | 1 | 2

101 | 0 | 2

首先我使用了groupByKey,输出是这样的

100 Click, Impression, Impression

101 Impression, Impression

现在可以用KV来计算值了吗?

目前我只是使用比较字符串来计算点击次数和展示次数。

这里可以使用计数转换吗?

或者我们在这里使用任何其他转换吗?

还是我的方法是唯一的方法?

谢谢, 山姆

我假设您的输入可以作为 PCollection<KV<Long, EventType>> input 使用,其中 Long 是广告商 ID,EventTypeenum { CLICK, IMPRESSION, possibly something else }

我还假设您希望输出是一个 PCollection>,其中 AdvertiserStats 是一个 class,字段为 "numClicks"、"numImpressions".

在这种情况下,实现您想要的效果的一种方法是使用 Combine - input.apply(Combine.<Long, AdvertiserStats>perKey(new ComputeAdvertiserStatsFn())),其中 ComputeAdvertiserStatsFn 的定义如下:

public class ComputeAdvertiserStatsFn
    extends CombineFn<EventType, AdvertiserStats, AdvertiserStats> {
  public AdvertiserStats createAccumulator() { return new AdvertiserStats(); }
  public void addInput(AdvertiserStats stats, EventType input) {
    switch (input) {
    case CLICK: stats.numClicks++; break;
    case IMPRESSION: stats.numImpressions++; break;
    default: (depending on your application?)
    }
  }
  public AdvertiserStats mergeAccumulators(Iterable<AdvertiserStats> stats) {
    AdvertiserStats merged = createAccumulator();
    for (AdvertiserStats item : stats) {
      merged.numClicks += item.numClicks;
      merged.numImpressions += item.numImpressions;
    }
    return merged;
  }
  public AdvertiserStats extractOutput(AdvertiserStats stats) { return stats; }
}

这应该执行得很好,因为大部分分组和计数都将在本地进行。

目前,据我所知,没有 PTransform 可以为您完成 ComputeAdvertiserStatsFn 的工作。我认为理想的界面应该类似于 input.apply(Combine.perKey(Count.perElement())),但它不适用于当前定义的方式。