条目和列之间的 Pyspark 欧氏距离

Pyspark euclidean distance between entry and column

我正在使用 pyspark,想知道是否有任何聪明的方法可以在数组的一行条目和整列之间获得欧氏距离。比如有这样一个数据集

+--------------------+---+
|            features| id|
+--------------------+---+
|[0,1,2,3,4,5     ...|  0|
|[0,1,2,3,4,5     ...|  1|
|[1,2,3,6,7,8     ...|  2|

选择其中一列,即id==1,计算欧氏距离。在这种情况下,结果应该是 [0,0,sqrt(1+1+1+9+9+9)]。 任何人都可以弄清楚如何有效地做到这一点吗? 谢谢!

您可以 BucketedRandomProjectionLSH [1] 获取数据框之间距离的笛卡尔坐标。

from pyspark.ml.feature import BucketedRandomProjectionLSH

brp = BucketedRandomProjectionLSH(
    inputCol="features", outputCol="hashes", seed=12345, bucketLength=1.0
)
model = brp.fit(df)
model.approxSimilarityJoin(df, df, 3.0, distCol="EuclideanDistance")

您还可以使用 approxNearestNeighbors [2] 获取一行到一列的距离,但结果受限于 numNearestNeighbors,因此您可以给它整个数据帧的计数。

one_row = df.where(df.id == 1).first().features
model.approxNearestNeighbors(df2, one_row, df.count()).collect()

此外,请务必将数据转换为向量!

from pyspark.sql import functions as F

to_dense_vector = F.udf(Vectors.dense, VectorUDF())
df = df.withColumn('features', to_dense_vector('features'))

[1] https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=approx#pyspark.ml.feature.BucketedRandomProjectionLSH

[2] https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=approx#pyspark.ml.feature.BucketedRandomProjectionLSHModel.approxNearestNeighbors

如果您只需要找到数据框中特定行与其他每一行之间的欧氏距离,则可以过滤并收集该行并将其传递给 udf

但是,如果你需要计算所有对之间的距离,你需要使用连接。
按 id 重新分区数据帧,这将加快连接操作。无需计算完整的成对矩阵,只需计算上半部分或下半部分并复制即可。我根据这个逻辑给自己写了一个函数。

 df = df.repartition("id")
 df.cache()
 df.show()


 #metric = any callable function to calculate distance b/w two vectors
 def pairwise_metric(Y, metric, col_name="metric"):

     Y2 = Y.select(f.col("id").alias("id2"), 
                 f.col("features").alias("features2"))

     # join to create lower or upper half
     Y = Y.join(Y2, Y.id < Y2.id2, "inner")

     def sort_list(x):

         x = sorted(x, key=lambda y:y[0])
         x = list(map(lambda y:y[1], x))

         return(x)

     udf_diff = f.udf(lambda x,y: metric(x,y), t.FloatType())
     udf_sort = f.udf(sort_list, t.ArrayType(t.FloatType()))

     Yid = Y2.select("id2").distinct().select("id2", 
          f.col("id2").alias("id")).withColumn("dist", f.lit(0.0))

     Y = Y.withColumn("dist", udf_diff("features", 
              "features2")).drop("features","features2")

     # just swap the column names and take union to get the other half
     Y =Y.union(Y.select(f.col("id2").alias("id"),
          f.col("id").alias("id2"), "dist"))
     # union for the diagonal elements of distance matrix
     Y = Y.union(Yid)

     st1 = f.struct(["id2", "dist"]).alias("vals")
     # groupby , aggregate and sort
     Y = (Y.select("id",st1).groupBy("id").agg(f.collect_list("vals").
                             alias("vals")).withColumn("dist",udf_sort("vals")).drop("vals"))

     return(Y.select(f.col("id").alias("id1"), f.col("dist").alias(col_name)))

如果您希望欧几里得具有列的固定条目,只需执行此操作即可。

import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from scipy.spatial import distance

fixed_entry = [0,3,2,7...] #for example, the entry against which you want distances
distance_udf = F.udf(lambda x: float(distance.euclidean(x, fixed_entry)), FloatType())
df = df.withColumn('distances', distance_udf(F.col('features')))

您的 df 将有一列距离。

这是一个使用 SQL 函数 power() 计算两个数据帧中匹配行之间的欧氏距离的实现

cols2Join = ['Key1','Key2']
colsFeature =['Feature1','Feature2','Feature3','Feature4']
columns = cols2Join + colsFeature

valuesA = [('key1value1','key2value1',111,22,33,.334),('key1value3','key2value3', 333,444,12,.445),('key1value5','key2value5',555,666,101,.99),('key1value7','key2value7',777,888,10,.019)]
table1 = spark.createDataFrame(valuesA,columns)
valuesB = [('key1value1','key2value1',22,33,3,.1),('key1value3','key2value3', 88,99,4,1.23),('key1value5','key2value5',4,44,1,.998),('key1value7','key2value7',9,99,1,.3)]
table2= spark.createDataFrame(valuesB,columns)

#Create the sql expression using list comprehension, we use sql function power to compute euclidean distance inline
beginExpr='power(('
InnerExpr = ['power((a.{}-b.{}),2)'.format(x,x) for x in colsFeature]
InnerExpr = '+'.join(str(e) for e in InnerExpr)
endExpr ='),0.5) AS EuclideanDistance'
distanceExpr = beginExpr + InnerExpr + endExpr
Expr =  cols2Join+  [distanceExpr]

#now just join the tables and use Select Expr to get Euclidean distance
outDF = table1.alias('a').join(table2.alias('b'),cols2Join,how="inner").selectExpr(Expr)

display(outDF)