PySpark 中带有向量的列之间的欧氏距离或余弦相似度
Euclidean distance or cosine similarity between columns with vectors in PySpark
我有以下形式的 Spark 数据框:
> df1
+---------------+----------------+
| vector1| vector2|
+---------------+----------------+
|[[0.9,0.5,0.2]]| [[0.1,0.3,0.2]]|
|[[0.8,0.7,0.1]]| [[0.8,0.4,0.2]]|
|[[0.9,0.2,0.8]]| [[0.3,0.1,0.8]]|
+---------------+----------------+
> df1.printSchema()
root
|-- vector1: array (nullable = true)
| |-- element: vector (containsNull = true)
|-- vector2: array (nullable = true)
| |-- element: vector (containsNull = true)
我需要计算 vector1
和 vector2
列之间的欧氏距离或余弦相似度。
我如何使用 PySpark 执行此操作?
● 当列为数组类型时:
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
完整测试:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[([0.9, 0.5, 0.2], [0.1, 0.3, 0.2]),
([0.8, 0.7, 0.1], [0.8, 0.4, 0.2]),
([0.9, 0.2, 0.8], [0.3, 0.1, 0.8])],
['vector1', 'vector2']
)
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
df2 = df1.withColumn('euclidean_distance', distance)
df2.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1 |vector2 |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822 |
# +---------------+---------------+-------------------+
●如果列是vector类型,我会先转成数组:
df2 = df1.select(
vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
完整测试:
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
df1 = spark.createDataFrame(
[([Vectors.dense(0.9, 0.5, 0.2)], [Vectors.dense(0.1, 0.3, 0.2)]),
([Vectors.dense(0.8, 0.7, 0.1)], [Vectors.dense(0.8, 0.4, 0.2)]),
([Vectors.dense(0.9, 0.2, 0.8)], [Vectors.dense(0.3, 0.1, 0.8)])],
['vector1', 'vector2']
)
df2 = df1.select(
vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
df3 = df2.withColumn('euclidean_distance', distance)
df3.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1 |vector2 |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822 |
# +---------------+---------------+-------------------+
让我们尝试 pandas udf。它的矢量化速度更快。
df
df=spark.createDataFrame([([[0.9,0.5,0.2]], [[0.1,0.3,0.2]]),
([[0.8,0.7,0.1]], [[0.8,0.4,0.2]]),
([[0.9,0.2,0.8]], [[0.3,0.1,0.8]])],
('vector1', 'vector2' ))
#flatten arrays.
df1= df.select(*[flatten(col(x)).alias(x) for x in df.columns])
#create udf with new schema taking into consideration new col with distance
sch= df1.withColumn('v', lit(90.087654623)).schema
#udf
from sklearn.metrics.pairwise import paired_distances
def Eucl(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in iterator:
yield pdf.assign(v=paired_distances(pdf['vector1'].to_list(),pdf['vector2'].to_list()))
df1.mapInPandas(Eucl, schema=sch).show()
结果
+---------------+---------------+-------------------+
| vector1| vector2| v|
+---------------+---------------+-------------------+
|[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]| 0.8246211251235323|
|[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
|[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]| 0.608276253029822|
+---------------+---------------+-------------------+
我有以下形式的 Spark 数据框:
> df1
+---------------+----------------+
| vector1| vector2|
+---------------+----------------+
|[[0.9,0.5,0.2]]| [[0.1,0.3,0.2]]|
|[[0.8,0.7,0.1]]| [[0.8,0.4,0.2]]|
|[[0.9,0.2,0.8]]| [[0.3,0.1,0.8]]|
+---------------+----------------+
> df1.printSchema()
root
|-- vector1: array (nullable = true)
| |-- element: vector (containsNull = true)
|-- vector2: array (nullable = true)
| |-- element: vector (containsNull = true)
我需要计算 vector1
和 vector2
列之间的欧氏距离或余弦相似度。
我如何使用 PySpark 执行此操作?
● 当列为数组类型时:
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
完整测试:
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[([0.9, 0.5, 0.2], [0.1, 0.3, 0.2]),
([0.8, 0.7, 0.1], [0.8, 0.4, 0.2]),
([0.9, 0.2, 0.8], [0.3, 0.1, 0.8])],
['vector1', 'vector2']
)
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
df2 = df1.withColumn('euclidean_distance', distance)
df2.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1 |vector2 |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822 |
# +---------------+---------------+-------------------+
●如果列是vector类型,我会先转成数组:
df2 = df1.select(
vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
完整测试:
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array
df1 = spark.createDataFrame(
[([Vectors.dense(0.9, 0.5, 0.2)], [Vectors.dense(0.1, 0.3, 0.2)]),
([Vectors.dense(0.8, 0.7, 0.1)], [Vectors.dense(0.8, 0.4, 0.2)]),
([Vectors.dense(0.9, 0.2, 0.8)], [Vectors.dense(0.3, 0.1, 0.8)])],
['vector1', 'vector2']
)
df2 = df1.select(
vector_to_array(F.element_at('vector1', 1)).alias('vector1'),
vector_to_array(F.element_at('vector2', 1)).alias('vector2'),
)
distance = F.aggregate(
F.transform(
F.arrays_zip('vector1', 'vector2'),
lambda x: (x['vector1'] - x['vector2'])**2
),
F.lit(0.0),
lambda acc, x: acc + x,
lambda x: x**.5
)
df3 = df2.withColumn('euclidean_distance', distance)
df3.show(truncate=0)
# +---------------+---------------+-------------------+
# |vector1 |vector2 |euclidean_distance |
# +---------------+---------------+-------------------+
# |[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]|0.8246211251235323 |
# |[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
# |[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]|0.608276253029822 |
# +---------------+---------------+-------------------+
让我们尝试 pandas udf。它的矢量化速度更快。
df
df=spark.createDataFrame([([[0.9,0.5,0.2]], [[0.1,0.3,0.2]]),
([[0.8,0.7,0.1]], [[0.8,0.4,0.2]]),
([[0.9,0.2,0.8]], [[0.3,0.1,0.8]])],
('vector1', 'vector2' ))
#flatten arrays.
df1= df.select(*[flatten(col(x)).alias(x) for x in df.columns])
#create udf with new schema taking into consideration new col with distance
sch= df1.withColumn('v', lit(90.087654623)).schema
#udf
from sklearn.metrics.pairwise import paired_distances
def Eucl(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in iterator:
yield pdf.assign(v=paired_distances(pdf['vector1'].to_list(),pdf['vector2'].to_list()))
df1.mapInPandas(Eucl, schema=sch).show()
结果
+---------------+---------------+-------------------+
| vector1| vector2| v|
+---------------+---------------+-------------------+
|[0.9, 0.5, 0.2]|[0.1, 0.3, 0.2]| 0.8246211251235323|
|[0.8, 0.7, 0.1]|[0.8, 0.4, 0.2]|0.31622776601683783|
|[0.9, 0.2, 0.8]|[0.3, 0.1, 0.8]| 0.608276253029822|
+---------------+---------------+-------------------+