在 map() 或任何其他解决方案中使用 sc.parallelize ?
Using sc.parallelize inside map() or any other solution?
我有以下问题:我需要根据 A 列中的每个 ID 在 B 列中找到值的所有组合,并且 return 结果作为 DataFrame
在下面的输入 DataFrame 示例中
A B
0 5 10
1 1 20
2 1 15
3 3 50
4 5 14
5 1 30
6 1 15
7 3 33
我需要获得以下输出数据帧(用于 GraphX\GraphFrame)
src dist A
0 10 14 5
1 50 33 3
2 20 15 1
3 30 15 1
4 20 30 1
到目前为止我想到的一个解决方案是:
df_result = df.drop_duplicates().\
map(lambda (A,B):(A,[B])).\
reduceByKey(lambda p, q: p + q).\
map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))
print df_result.take(3)
输出:[(1, [(20,15),(30,20),(30,15)]),(5,[(10,14)]),(3,[(50 ,33)])]
我被卡住了:(如何return它到我需要的数据框?一个想法是使用并行化:
import spark_sc
edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))
对于 spark_sc
我还有其他名称为 spark_sc.py
的文件
def init():
global sc
global sqlContext
sc = SparkContext(conf=conf,
appName="blablabla",
pyFiles=['my_file_with_code.py'])
sqlContext = SQLContext(sc)
但是我的代码失败了:
AttributeError: 'module' object has no attribute 'sc'
如果我使用 spark_sc.sc()
而不是 map()
就可以了。
知道我在最后一步错过了什么吗?是否可以使用 parallelize()
?或者我需要完全不同的解决方案?
谢谢!
您肯定需要另一种解决方案,它可以很简单:
from pyspark.sql.functions import greatest, least, col
df.alias("x").join(df.alias("y"), ["A"]).select(
least("x.B", "y.B").alias("src"), greatest("x.B", "y.B").alias("dst"), "A"
).where(col("src") != col("dst")).distinct()
其中:
df.alias("x").join(df.alias("y"), ["A"])
通过 A
、
将 table 与自身连接
least("x.B", "y.B").alias("src")
和
greatest("x.B", "y.B")
选择具有较低 id
的值作为源,较高的 id 作为目标。最后:
where(col("src") != col("dst"))
删除自循环。
一般来说,不可能从动作或转换中使用 SparkContext
(并不是说在您的情况下这样做没有任何意义)。
我有以下问题:我需要根据 A 列中的每个 ID 在 B 列中找到值的所有组合,并且 return 结果作为 DataFrame
在下面的输入 DataFrame 示例中
A B
0 5 10
1 1 20
2 1 15
3 3 50
4 5 14
5 1 30
6 1 15
7 3 33
我需要获得以下输出数据帧(用于 GraphX\GraphFrame)
src dist A
0 10 14 5
1 50 33 3
2 20 15 1
3 30 15 1
4 20 30 1
到目前为止我想到的一个解决方案是:
df_result = df.drop_duplicates().\
map(lambda (A,B):(A,[B])).\
reduceByKey(lambda p, q: p + q).\
map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))
print df_result.take(3)
输出:[(1, [(20,15),(30,20),(30,15)]),(5,[(10,14)]),(3,[(50 ,33)])]
我被卡住了:(如何return它到我需要的数据框?一个想法是使用并行化:
import spark_sc
edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))
对于 spark_sc
我还有其他名称为 spark_sc.py
def init():
global sc
global sqlContext
sc = SparkContext(conf=conf,
appName="blablabla",
pyFiles=['my_file_with_code.py'])
sqlContext = SQLContext(sc)
但是我的代码失败了:
AttributeError: 'module' object has no attribute 'sc'
如果我使用 spark_sc.sc()
而不是 map()
就可以了。
知道我在最后一步错过了什么吗?是否可以使用 parallelize()
?或者我需要完全不同的解决方案?
谢谢!
您肯定需要另一种解决方案,它可以很简单:
from pyspark.sql.functions import greatest, least, col
df.alias("x").join(df.alias("y"), ["A"]).select(
least("x.B", "y.B").alias("src"), greatest("x.B", "y.B").alias("dst"), "A"
).where(col("src") != col("dst")).distinct()
其中:
df.alias("x").join(df.alias("y"), ["A"])
通过 A
、
least("x.B", "y.B").alias("src")
和
greatest("x.B", "y.B")
选择具有较低 id
的值作为源,较高的 id 作为目标。最后:
where(col("src") != col("dst"))
删除自循环。
一般来说,不可能从动作或转换中使用 SparkContext
(并不是说在您的情况下这样做没有任何意义)。