如何使用 Dataflow 计算每个键的最高唯一值结果?
How do I use Dataflow to calculate the top unique value results per key?
我对 Dataflow 及其编程模型还比较陌生,并且正在努力解决需要计算客户支出最高的前 10 周的问题。如果这个问题看起来很愚蠢,我深表歉意。
我拥有的数据包括我用作键的客户 ID 和包含时间戳和支出值的几百万条记录。
我创建了一个看起来像这样的提取方法(不包括日志记录和日期格式化程序)。它收到一个 BigQuery table 行,我从中提取客户 ID、支出和时间戳,从中我得到周数:
static class ExtractSpend extends DoFn<TableRow, KV<String, SpendByWeek>> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
String custId = (String) row.get("customerID");
LocalDateTime date = LocalDateTime.parse((String) row.get("timestamp"), dateTimeFormatter);
WeekFields weekFields = WeekFields.of(Locale.getDefault());
int weekNumber = date.get(weekFields.weekOfWeekBasedYear());
Double spend = (Double) row.get("spend");
SpendByWeek spendByWeek = new SpendByWeek(weekNumber, spend.doubleValue());
c.output(KV.of(custId, spendByWeek));
}
}
但我不知道如何获取此输出并将其分组,以便我可以添加每个客户 ID 和每周的支出值,对它们进行排序并输出每个客户的 PCollection<String, List<Double>>
以及他们的前 10 个每周支出值。
有人能帮我解决这个问题吗?
如果您只想使用分组来完成此操作,则需要先按客户 ID 和周进行分组以计算 Sum
,然后将周移至值中并仅按客户 ID 重新分组计算 Top
。您也可以使用 windowing 而不是将星期放在密钥中来执行此操作。有关执行此操作的一些详细信息,请参阅结尾。
一旦你这样做了,你就有了一个 PCollection<KV<String, SpendByWeek>>
每周针对给定的密钥发生一次。您可以通过定义实现 Serializable
的 Comparator<SpendByWeek>
并将其与 Top.perKey()
.
一起使用来确定每个给定用户 ID 的顶部 SpendByWeek
使用 Windows
计算每位用户每周花费
如顶部所述,您可以使用 windowing 来帮助计算每周支出。
- 编写一个类似于您的 ExtractSpend 的 DoFn,它接受一个 TableRow 并输出一个 KV,这是由客户 ID 键控的单独支出行,并使用
outputWithTimestamp
. 输出输出
- 然后应用 windowing 转换,例如
FixedWindows
,它将把事件分成指定大小的 windows。在您的情况下,您可能需要 FixedWindows.of(Duration.standardWeeks(1))
或 CalendarWindows.weeks(...)
.
- 然后应用
Sum.doublesPerKey()
等转换。
此时,您将得到一个 PCollection
,其中包含每周 window KV<String, Double>
,其中每个条目都是该键在一周内的总支出。
- 然后你可以 运行 一个
DoFn
获取每小时 windows 并将该信息移动到值中(所以现在你有 KV<String, SpendAndWeek>
)
- 应用Window.into切换到
GlobalWindows
- 然后如上所述应用
Top.perKey
操作。
我对 Dataflow 及其编程模型还比较陌生,并且正在努力解决需要计算客户支出最高的前 10 周的问题。如果这个问题看起来很愚蠢,我深表歉意。
我拥有的数据包括我用作键的客户 ID 和包含时间戳和支出值的几百万条记录。
我创建了一个看起来像这样的提取方法(不包括日志记录和日期格式化程序)。它收到一个 BigQuery table 行,我从中提取客户 ID、支出和时间戳,从中我得到周数:
static class ExtractSpend extends DoFn<TableRow, KV<String, SpendByWeek>> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
String custId = (String) row.get("customerID");
LocalDateTime date = LocalDateTime.parse((String) row.get("timestamp"), dateTimeFormatter);
WeekFields weekFields = WeekFields.of(Locale.getDefault());
int weekNumber = date.get(weekFields.weekOfWeekBasedYear());
Double spend = (Double) row.get("spend");
SpendByWeek spendByWeek = new SpendByWeek(weekNumber, spend.doubleValue());
c.output(KV.of(custId, spendByWeek));
}
}
但我不知道如何获取此输出并将其分组,以便我可以添加每个客户 ID 和每周的支出值,对它们进行排序并输出每个客户的 PCollection<String, List<Double>>
以及他们的前 10 个每周支出值。
有人能帮我解决这个问题吗?
如果您只想使用分组来完成此操作,则需要先按客户 ID 和周进行分组以计算 Sum
,然后将周移至值中并仅按客户 ID 重新分组计算 Top
。您也可以使用 windowing 而不是将星期放在密钥中来执行此操作。有关执行此操作的一些详细信息,请参阅结尾。
一旦你这样做了,你就有了一个 PCollection<KV<String, SpendByWeek>>
每周针对给定的密钥发生一次。您可以通过定义实现 Serializable
的 Comparator<SpendByWeek>
并将其与 Top.perKey()
.
SpendByWeek
使用 Windows
计算每位用户每周花费如顶部所述,您可以使用 windowing 来帮助计算每周支出。
- 编写一个类似于您的 ExtractSpend 的 DoFn,它接受一个 TableRow 并输出一个 KV,这是由客户 ID 键控的单独支出行,并使用
outputWithTimestamp
. 输出输出
- 然后应用 windowing 转换,例如
FixedWindows
,它将把事件分成指定大小的 windows。在您的情况下,您可能需要FixedWindows.of(Duration.standardWeeks(1))
或CalendarWindows.weeks(...)
. - 然后应用
Sum.doublesPerKey()
等转换。
此时,您将得到一个 PCollection
,其中包含每周 window KV<String, Double>
,其中每个条目都是该键在一周内的总支出。
- 然后你可以 运行 一个
DoFn
获取每小时 windows 并将该信息移动到值中(所以现在你有KV<String, SpendAndWeek>
) - 应用Window.into切换到
GlobalWindows
- 然后如上所述应用
Top.perKey
操作。