如何使用 Dataflow 计算每个键的最高唯一值结果?

How do I use Dataflow to calculate the top unique value results per key?

我对 Dataflow 及其编程模型还比较陌生,并且正在努力解决需要计算客户支出最高的前 10 周的问题。如果这个问题看起来很愚蠢,我深表歉意。

我拥有的数据包括我用作键的客户 ID 和包含时间戳和支出值的几百万条记录。

我创建了一个看起来像这样的提取方法(不包括日志记录和日期格式化程序)。它收到一个 BigQuery table 行,我从中提取客户 ID、支出和时间戳,从中我得到周数:

static class ExtractSpend extends DoFn<TableRow, KV<String, SpendByWeek>> {
    private static final long serialVersionUID = 0;

    @Override
    public void processElement(ProcessContext c) {
        String custId = (String) row.get("customerID");
        LocalDateTime date = LocalDateTime.parse((String) row.get("timestamp"), dateTimeFormatter);

        WeekFields weekFields = WeekFields.of(Locale.getDefault());
        int weekNumber = date.get(weekFields.weekOfWeekBasedYear());

        Double spend = (Double) row.get("spend");

        SpendByWeek spendByWeek = new SpendByWeek(weekNumber, spend.doubleValue());
        c.output(KV.of(custId, spendByWeek));

   }
}

但我不知道如何获取此输出并将其分组,以便我可以添加每个客户 ID 和每周的支出值,对它们进行排序并输出每个客户的 PCollection<String, List<Double>>以及他们的前 10 个每周支出值。

有人能帮我解决这个问题吗?

如果您只想使用分组来完成此操作,则需要先按客户 ID 和周进行分组以计算 Sum,然后将周移至值中并仅按客户 ID 重新分组计算 Top。您也可以使用 windowing 而不是将星期放在密钥中来执行此操作。有关执行此操作的一些详细信息,请参阅结尾。

一旦你这样做了,你就有了一个 PCollection<KV<String, SpendByWeek>> 每周针对给定的密钥发生一次。您可以通过定义实现 SerializableComparator<SpendByWeek> 并将其与 Top.perKey().

一起使用来确定每个给定用户 ID 的顶部 SpendByWeek

使用 Windows

计算每位用户每周花费

如顶部所述,您可以使用 windowing 来帮助计算每周支出。

  1. 编写一个类似于您的 ExtractSpend 的 DoFn,它接受一个 TableRow 并输出一个 KV,这是由客户 ID 键控的单独支出行,并使用 outputWithTimestamp.
  2. 输出输出
  3. 然后应用 windowing 转换,例如 FixedWindows,它将把事件分成指定大小的 windows。在您的情况下,您可能需要 FixedWindows.of(Duration.standardWeeks(1))CalendarWindows.weeks(...).
  4. 然后应用 Sum.doublesPerKey() 等转换。

此时,您将得到一个 PCollection,其中包含每周 window KV<String, Double>,其中每个条目都是该键在一周内的总支出。

  1. 然后你可以 运行 一个 DoFn 获取每小时 windows 并将该信息移动到值中(所以现在你有 KV<String, SpendAndWeek>
  2. 应用Window.into切换到GlobalWindows
  3. 然后如上所述应用 Top.perKey 操作。