如何通过键区分火花rdd?
how to distinct the spark rdd by the key?
现在,我有一个RDD,RDD中的记录如下:
key1 value1
key1 value2
key2 value3
key3 value4
key3 value5
我想获取不同key的RDD记录,如下:
key1 value1
key2 value3
key3 value4
我可以只使用 spark-core API,而不聚合相同键的值。
有数据框和collect_set
:
sqlContext.createDataFrame(rdd).toDF("k", "v")
.groupBy("k")
.agg(collect_set(col("v")))
你可以用 PairRDDFunctions.reduceByKey
做到这一点。假设你有一个 RDD[(K, V)]
:
rdd.reduceByKey((a, b) => if (someCondition) a else b)
另一种选择。它在 PySpark
中,但我几乎可以肯定 Scala 中应该有类似的方法。
再次假设您有一个包含 (key, value) 元素的 RDD,那么
简短的回答是,
rdd.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))
完整的工作代码示例是,
from pyspark import SparkContext
SparkContext._active_spark_context.stop()
spConf = SparkConf()
spConf.setAppName('unique_keys')
sc = SparkContext(conf=spConf)
sample_data = sc.parallelize([('k1','v1'),('k1','v2'),('k2','v3'),('k3','v4'),('k3','v5')],3)
print('original rdd {}'.format(sorted(sample_data.collect(),key = lambda t: t[0])))
print('original rdd has {} unique elements'.format(sample_data.distinct().count()))
print('original rdd has {} unique keys'.format(sample_data.map(lambda t: t[0]).distinct().count()))
sample_data = sample_data.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))
print('rdd with unique keys {}'.format(sorted(sample_data.collect()),key = lambda t: t[0]))
输出,
original rdd [('k1', 'v1'), ('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4'), ('k3', 'v5')]
original rdd has 5 unique elements
original rdd has 3 unique keys
rdd with unique keys [('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4')]
现在,我有一个RDD,RDD中的记录如下:
key1 value1
key1 value2
key2 value3
key3 value4
key3 value5
我想获取不同key的RDD记录,如下:
key1 value1
key2 value3
key3 value4
我可以只使用 spark-core API,而不聚合相同键的值。
有数据框和collect_set
:
sqlContext.createDataFrame(rdd).toDF("k", "v")
.groupBy("k")
.agg(collect_set(col("v")))
你可以用 PairRDDFunctions.reduceByKey
做到这一点。假设你有一个 RDD[(K, V)]
:
rdd.reduceByKey((a, b) => if (someCondition) a else b)
另一种选择。它在 PySpark
中,但我几乎可以肯定 Scala 中应该有类似的方法。
再次假设您有一个包含 (key, value) 元素的 RDD,那么
简短的回答是,
rdd.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))
完整的工作代码示例是,
from pyspark import SparkContext
SparkContext._active_spark_context.stop()
spConf = SparkConf()
spConf.setAppName('unique_keys')
sc = SparkContext(conf=spConf)
sample_data = sc.parallelize([('k1','v1'),('k1','v2'),('k2','v3'),('k3','v4'),('k3','v5')],3)
print('original rdd {}'.format(sorted(sample_data.collect(),key = lambda t: t[0])))
print('original rdd has {} unique elements'.format(sample_data.distinct().count()))
print('original rdd has {} unique keys'.format(sample_data.map(lambda t: t[0]).distinct().count()))
sample_data = sample_data.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))
print('rdd with unique keys {}'.format(sorted(sample_data.collect()),key = lambda t: t[0]))
输出,
original rdd [('k1', 'v1'), ('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4'), ('k3', 'v5')]
original rdd has 5 unique elements
original rdd has 3 unique keys
rdd with unique keys [('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4')]