ReduceByKey + Map + Seq 解释

ReduceByKey + Map + Seq explanation

我正在尝试弄清楚 reduceByKey 是如何运作的,但这种情况让我感到困惑,我根本无法理解它。

密码是:

 stream.foreachRDD((rdd: RDD[Record]) => {
      // convert string to PoJo and generate rows as tuple group
    val pairs = rdd
            .map(row => (row.timestamp(), jsonDecode(row.value())))
            .map(row => (row._2.getType.name(), (1, row._2.getValue, row._1)))
    val flatten = pairs
                .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, (y._3 + x._3) / 2))
                .map(f => Row.fromSeq(Seq(f._1, f._2._2 / f._2._1, new Timestamp(f._2._3))))

想象一下数据收入: ["oceania", 500], ["australia", 450] 等

flatten 变量中,我试图按市场类型或 JSON 中的第一种类型汇总数据。这是生成元组: * 第一个是计数器值,这个值为 1, * 第二个是从 Kafka 收到的速率, * 第三个是活动时间。例如 2017-05-12 16:00:00 * * 在地图中, * method f._1 是市场名称, * 我们将总费率除以总项目数 f._2._2 / f._2._1 * 如您所见,f._2._3 是平均事件时间

谁能帮我解释一下 f._2._3 是什么意思(我的意思是我知道它的临时变量,但里面有什么或可能有什么)以及如何通过除以 f._2._2 / f._2._1 来计算总速率,什么到底是除法?谢谢:)

对于每一行,您在 RDD pairs 中定义以下元素:

(marketType, (counter, rate, eventTime))

请注意,这是一个 Tuple2,其第二个元素是 Tuple3Tuple 是特例 class,其第 n 个元素(从 1 开始)被命名为 _n。例如,要访问元素 frate,您必须执行 f._2._2([=18= 的第二个元素],这是 Tuple2).

由于您的元素具有特殊含义,您可能需要考虑定义一个案例 class MyRow(counter: Int, rate: Int, time: Timestamp),以便在您编写类似 f._2._3(顺便说一句,eventTime的类型我不清楚,因为你只将它表示为String,但你对它进行了数值运算)。

现在看看您的代码真正尝试做什么:

reducing 函数需要两个 Tuple3(或 MyRow,如果你更改代码)并输出另一个(这里,你的 reducing 函数对计数器、比率求和,并使eventTime 上两个值之间的平均值)。

reduceByKey 只要找到两个具有相同键的元素就应用这个归约函数:因为归约函数的输出与其输入的类型相同,所以它可以应用于它,因为只要您的 RDD 上有其他具有相同键的值。

举个简单的例子,如果你有

(key1, (1, 200, 2017/04/04 12:00:00))
(key1, (1, 300, 2017/04/04 12:00:00))
(key1, (1, 500, 2017/04/04 12:00:00))
(key2, (1, 500, 2017/04/04 12:00:00))

然后reduceByKey会输出

(key1, (3, 1000, 2017/04/04 12:00:00))
(key2, (1, 500, 2017/04/04 12:00:00))

然后你的最后一个 map 将通过计算总费率来解决这个问题:

(key1, (333, 2017/04/04 12:00:00))
(key2, (500, 2017/04/04 12:00:00))

您可能已经注意到我在所有示例中始终使用相同的时间。那是因为你在这个字段上的归约函数会产生意想不到的结果,因为它不是 associative。尝试做与上述相同的练习,但使用不同的时间戳,您将看到 key1 的减少值将根据您应用减少的顺序而有所不同。

让我们看看这个:我们想用这个函数减少 4、8 和 16,所以我们可能想这样做

((4 + 8) / 2 + 16) / 2

(4 + (8 + 16) / 2) / 2

取决于我们是想从左边开始还是从右边开始(在实际情况中,有更多不同的可能性,它们会在 Spark 中发生,因为你并不总是知道你的值是如何分布的在群集上)。

计算上面的两种可能性,我们得到不同的值:118,所以你看这在现实生活中可能会导致更大的问题。

针对您的情况,一个简单的解决方案是也对所有时间戳求和(假设它们是 Long 值,甚至是 BigInteger,以避免溢出),并且仅在以具有实时平均值的值的数量结束。