PySpark 中带有向量的列之间的欧氏距离或余弦相似度

Euclidean distance or cosine similarity between columns with vectors in PySpark

我有以下形式的 Spark 数据框:

> df1
+---------------+----------------+
|        vector1|         vector2|  
+---------------+----------------+
|[[0.9,0.5,0.2]]| [[0.1,0.3,0.2]]|
|[[0.8,0.7,0.1]]| [[0.8,0.4,0.2]]|
|[[0.9,0.2,0.8]]| [[0.3,0.1,0.8]]|
+---------------+----------------+

> df1.printSchema()
root
 |-- vector1: array (nullable = true)
 |    |-- element: vector (containsNull = true)
 |-- vector2: array (nullable = true)
 |    |-- element: vector (containsNull = true)

我需要计算 vector1vector2 列之间的欧氏距离或余弦相似度。
我如何使用 PySpark 执行此操作?

当列为数组类型时:

distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)

完整测试:

from pyspark.sql import functions as F
df1 = spark.createDataFrame(
    [([0.9, 0.5, 0.2], [0.1, 0.3, 0.2]),
     ([0.8, 0.7, 0.1], [0.8, 0.4, 0.2]),
     ([0.9, 0.2, 0.8], [0.3, 0.1, 0.8])],
    ['vector1', 'vector2']
)
distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)
df2 = df1.withColumn('euclidean_distance', distance)

df2.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1        |vector2        |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822  |
# +---------------+---------------+-------------------+

如果列是vector类型,我会先转成数组:

df2 = df1.select(
    vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
    vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)

完整测试:

from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
df1 = spark.createDataFrame(
    [([Vectors.dense(0.9, 0.5, 0.2)], [Vectors.dense(0.1, 0.3, 0.2)]),
     ([Vectors.dense(0.8, 0.7, 0.1)], [Vectors.dense(0.8, 0.4, 0.2)]),
     ([Vectors.dense(0.9, 0.2, 0.8)], [Vectors.dense(0.3, 0.1, 0.8)])],
    ['vector1', 'vector2']
)
df2 = df1.select(
    vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
    vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
distance = F.aggregate(
    F.transform(
        F.arrays_zip('vector1', 'vector2'),
        lambda x: (x['vector1'] - x['vector2'])**2
    ),
    F.lit(0.0),
    lambda acc, x: acc + x,
    lambda x: x**.5
)
df3 = df2.withColumn('euclidean_distance', distance)

df3.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1        |vector2        |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822  |
# +---------------+---------------+-------------------+

让我们尝试 pandas udf。它的矢量化速度更快。

df

df=spark.createDataFrame([([[0.9,0.5,0.2]], [[0.1,0.3,0.2]]),
([[0.8,0.7,0.1]], [[0.8,0.4,0.2]]),
([[0.9,0.2,0.8]], [[0.3,0.1,0.8]])],


('vector1',         'vector2' ))



#flatten arrays.
    df1= df.select(*[flatten(col(x)).alias(x) for x in df.columns])

#create udf with new schema taking into consideration new col with distance     
    sch= df1.withColumn('v', lit(90.087654623)).schema



#udf

 from sklearn.metrics.pairwise import paired_distances
def Eucl(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
        for pdf in iterator:
               
          yield pdf.assign(v=paired_distances(pdf['vector1'].to_list(),pdf['vector2'].to_list()))
    
    df1.mapInPandas(Eucl, schema=sch).show()

结果

+---------------+---------------+-------------------+
|        vector1|        vector2|                  v|
+---------------+---------------+-------------------+
|[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]| 0.8246211251235323|
|[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
|[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|  0.608276253029822|
+---------------+---------------+-------------------+