如何通过键区分火花rdd?

how to distinct the spark rdd by the key?

现在,我有一个RDD,RDD中的记录如下:

key1  value1
key1  value2
key2  value3
key3  value4
key3  value5

我想获取不同key的RDD记录,如下:

key1  value1
key2  value3
key3  value4

我可以只使用 spark-core API,而不聚合相同键的值。

有数据框和collect_set:

sqlContext.createDataFrame(rdd).toDF("k", "v")
  .groupBy("k")
  .agg(collect_set(col("v")))

你可以用 PairRDDFunctions.reduceByKey 做到这一点。假设你有一个 RDD[(K, V)]:

rdd.reduceByKey((a, b) => if (someCondition) a else b)

另一种选择。它在 PySpark 中,但我几乎可以肯定 Scala 中应该有类似的方法。

再次假设您有一个包含 (key, value) 元素的 RDD,那么

简短的回答是,

    rdd.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))

完整的工作代码示例是,

    from pyspark import SparkContext
    SparkContext._active_spark_context.stop()
    spConf = SparkConf()
    spConf.setAppName('unique_keys')
    sc = SparkContext(conf=spConf)

    sample_data = sc.parallelize([('k1','v1'),('k1','v2'),('k2','v3'),('k3','v4'),('k3','v5')],3)
    print('original rdd {}'.format(sorted(sample_data.collect(),key = lambda t: t[0])))
    print('original rdd has {} unique elements'.format(sample_data.distinct().count()))
    print('original rdd has {} unique keys'.format(sample_data.map(lambda t: t[0]).distinct().count()))

    sample_data = sample_data.groupByKey().mapValues(list).map(lambda t: (t[0],t[1][0]))
    print('rdd with unique keys {}'.format(sorted(sample_data.collect()),key = lambda t: t[0]))

输出,

original rdd [('k1', 'v1'), ('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4'), ('k3', 'v5')]
original rdd has 5 unique elements
original rdd has 3 unique keys
rdd with unique keys [('k1', 'v2'), ('k2', 'v3'), ('k3', 'v4')]