在 map() 或任何其他解决方案中使用 sc.parallelize ?

Using sc.parallelize inside map() or any other solution?

我有以下问题:我需要根据 A 列中的每个 ID 在 B 列中找到值的所有组合,并且 return 结果作为 DataFrame

在下面的输入 DataFrame 示例中

        A     B       
0       5    10       
1       1    20      
2       1    15       
3       3    50       
4       5    14       
5       1    30       
6       1    15       
7       3    33       

我需要获得以下输出数据帧(用于 GraphX\GraphFrame)

        src dist      A
0       10   14       5
1       50   33       3
2       20   15       1
3       30   15       1
4       20   30       1

到目前为止我想到的一个解决方案是:

df_result = df.drop_duplicates().\
               map(lambda (A,B):(A,[B])).\
               reduceByKey(lambda p, q: p + q).\
               map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))

print df_result.take(3)

输出:[(1, [(20,15),(30,20),(30,15)]),(5,[(10,14)]),(3,[(50 ,33)])]

我被卡住了:(如何return它到我需要的数据框?一个想法是使用并行化:

import spark_sc

edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))

对于 spark_sc 我还有其他名称为 spark_sc.py

的文件
def init():
    global sc
    global sqlContext

    sc = SparkContext(conf=conf,
                  appName="blablabla",
                  pyFiles=['my_file_with_code.py'])

    sqlContext = SQLContext(sc)

但是我的代码失败了:

AttributeError: 'module' object has no attribute 'sc'

如果我使用 spark_sc.sc() 而不是 map() 就可以了。

知道我在最后一步错过了什么吗?是否可以使用 parallelize()?或者我需要完全不同的解决方案? 谢谢!

您肯定需要另一种解决方案,它可以很简单:

from pyspark.sql.functions import greatest, least, col

df.alias("x").join(df.alias("y"), ["A"]).select(
    least("x.B", "y.B").alias("src"), greatest("x.B", "y.B").alias("dst"), "A"
).where(col("src") != col("dst")).distinct()

其中:

df.alias("x").join(df.alias("y"), ["A"])

通过 A

将 table 与自身连接
least("x.B", "y.B").alias("src")

greatest("x.B", "y.B")

选择具有较低 id 的值作为源,较高的 id 作为目标。最后:

where(col("src") != col("dst"))

删除自循环。

一般来说,不可能从动作或转换中使用 SparkContext(并不是说在您的情况下这样做没有任何意义)。