应用 GroupBy,然后在 Google 数据流中应用 Count
Applying GroupBy and then apply Count in Google Dataflow
我的 Google 云存储中有以下内容
Advertiser | Event
__________________
100 | Click
101 | Impression
100 | Impression
100 | Impression
101 | Impression
我的管道输出应该是这样的
Advertiser | Clicks | Impressions
100 | 1 | 2
101 | 0 | 2
首先我使用了groupByKey,输出是这样的
100 Click, Impression, Impression
101 Impression, Impression
现在可以用KV来计算值了吗?
目前我只是使用比较字符串来计算点击次数和展示次数。
这里可以使用计数转换吗?
或者我们在这里使用任何其他转换吗?
还是我的方法是唯一的方法?
谢谢,
山姆
我假设您的输入可以作为 PCollection<KV<Long, EventType>> input
使用,其中 Long
是广告商 ID,EventType
是 enum { CLICK, IMPRESSION, possibly something else }
。
我还假设您希望输出是一个 PCollection>,其中 AdvertiserStats 是一个 class,字段为 "numClicks"、"numImpressions".
在这种情况下,实现您想要的效果的一种方法是使用 Combine - input.apply(Combine.<Long, AdvertiserStats>perKey(new ComputeAdvertiserStatsFn()))
,其中 ComputeAdvertiserStatsFn
的定义如下:
public class ComputeAdvertiserStatsFn
extends CombineFn<EventType, AdvertiserStats, AdvertiserStats> {
public AdvertiserStats createAccumulator() { return new AdvertiserStats(); }
public void addInput(AdvertiserStats stats, EventType input) {
switch (input) {
case CLICK: stats.numClicks++; break;
case IMPRESSION: stats.numImpressions++; break;
default: (depending on your application?)
}
}
public AdvertiserStats mergeAccumulators(Iterable<AdvertiserStats> stats) {
AdvertiserStats merged = createAccumulator();
for (AdvertiserStats item : stats) {
merged.numClicks += item.numClicks;
merged.numImpressions += item.numImpressions;
}
return merged;
}
public AdvertiserStats extractOutput(AdvertiserStats stats) { return stats; }
}
这应该执行得很好,因为大部分分组和计数都将在本地进行。
目前,据我所知,没有 PTransform
可以为您完成 ComputeAdvertiserStatsFn
的工作。我认为理想的界面应该类似于 input.apply(Combine.perKey(Count.perElement()))
,但它不适用于当前定义的方式。
我的 Google 云存储中有以下内容
Advertiser | Event
__________________
100 | Click
101 | Impression
100 | Impression
100 | Impression
101 | Impression
我的管道输出应该是这样的
Advertiser | Clicks | Impressions
100 | 1 | 2
101 | 0 | 2
首先我使用了groupByKey,输出是这样的
100 Click, Impression, Impression
101 Impression, Impression
现在可以用KV来计算值了吗?
目前我只是使用比较字符串来计算点击次数和展示次数。
这里可以使用计数转换吗?
或者我们在这里使用任何其他转换吗?
还是我的方法是唯一的方法?
谢谢, 山姆
我假设您的输入可以作为 PCollection<KV<Long, EventType>> input
使用,其中 Long
是广告商 ID,EventType
是 enum { CLICK, IMPRESSION, possibly something else }
。
我还假设您希望输出是一个 PCollection>,其中 AdvertiserStats 是一个 class,字段为 "numClicks"、"numImpressions".
在这种情况下,实现您想要的效果的一种方法是使用 Combine - input.apply(Combine.<Long, AdvertiserStats>perKey(new ComputeAdvertiserStatsFn()))
,其中 ComputeAdvertiserStatsFn
的定义如下:
public class ComputeAdvertiserStatsFn
extends CombineFn<EventType, AdvertiserStats, AdvertiserStats> {
public AdvertiserStats createAccumulator() { return new AdvertiserStats(); }
public void addInput(AdvertiserStats stats, EventType input) {
switch (input) {
case CLICK: stats.numClicks++; break;
case IMPRESSION: stats.numImpressions++; break;
default: (depending on your application?)
}
}
public AdvertiserStats mergeAccumulators(Iterable<AdvertiserStats> stats) {
AdvertiserStats merged = createAccumulator();
for (AdvertiserStats item : stats) {
merged.numClicks += item.numClicks;
merged.numImpressions += item.numImpressions;
}
return merged;
}
public AdvertiserStats extractOutput(AdvertiserStats stats) { return stats; }
}
这应该执行得很好,因为大部分分组和计数都将在本地进行。
目前,据我所知,没有 PTransform
可以为您完成 ComputeAdvertiserStatsFn
的工作。我认为理想的界面应该类似于 input.apply(Combine.perKey(Count.perElement()))
,但它不适用于当前定义的方式。