pySpark forEach 键上的函数
pySpark forEach function on a key
我似乎找不到很多关于 forEach 的文档。我有一个 key/value 配对的数据集。我想做类似的事情(伪代码):
forEach键,求和值
forEach 键,值的最大值
等等
This can be done e.g. with reduceByKey
rdd = sc.parallelize([("foo", 1), ("foo", 2), ("bar", 3)])
rdd.reduceByKey(lambda x, y : x + y).collect() # Sum for each key
# Gives [('foo', 3), ('bar', 3)]
x.reduceByKey(max).collect() # Max for each key
# Gives [('foo', 2), ('bar', 3)]
foreach(func) Run a function func on each element of the dataset. This
is usually done for side effects such as updating an accumulator
variable (see below) or interacting with external storage systems.
请注意突出显示的 "side effect"。 foreach 是对 RDD 的一个操作,它对 RDD 中的每个元素执行函数,但对驱动程序没有 return 任何东西。您可以将函数传递给它,例如 println 或增加累加器变量或保存到外部存储。
在您的用例中,您应该使用 reduceByKey。
我似乎找不到很多关于 forEach 的文档。我有一个 key/value 配对的数据集。我想做类似的事情(伪代码):
forEach键,求和值 forEach 键,值的最大值 等等
This can be done e.g. with reduceByKey
rdd = sc.parallelize([("foo", 1), ("foo", 2), ("bar", 3)])
rdd.reduceByKey(lambda x, y : x + y).collect() # Sum for each key
# Gives [('foo', 3), ('bar', 3)]
x.reduceByKey(max).collect() # Max for each key
# Gives [('foo', 2), ('bar', 3)]
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
请注意突出显示的 "side effect"。 foreach 是对 RDD 的一个操作,它对 RDD 中的每个元素执行函数,但对驱动程序没有 return 任何东西。您可以将函数传递给它,例如 println 或增加累加器变量或保存到外部存储。
在您的用例中,您应该使用 reduceByKey。