Jaccard 在没有笛卡尔的 Spark 和 Scala 的帮助下 RDD 的相似性?
Jaccard Similarity of an RDD with the help of Spark and Scala without Cartesian?
我正在研究成对的 RDD。我的目标是计算 jaccard 相似度
根据我的 RDD 的 jaccard 相似度阈值 value.Structure 在 rdd 值集之间对它们进行聚类是:
val a= [Key,Set(String)] //Pair RDD
For example:-
India,[Country,Place,....]
USA,[Country,State,..]
Berlin,[City,Popluatedplace,..]
找到jaccard相似度后,我会将相似的实体聚为一类。在上面的示例中,印度和美国将根据某个阈值聚类到一个集群中,而柏林将在另一个集群中。
所以我取了rdd的笛卡尔积a
val filterOnjoin = a.cartesian(a).filter(f =>
(!f._1._1.toString().contentEquals(f._2._1.toString())))
//Cartesianproduct of rdd a and filtering rows with same key at both
//the position.
//e.g. ((India,Set[Country,Place,....]),(USA,Set[Country,State,..]))
并借助 jaccard 相似度比较这组值。
val Jsim = filterOnjoin.map(f => (f._1._1, (f._2._1,
Similarity.sim(f._1._2, f._2._2)))) //calculating jaccard similarity.
//(India,USA,0.8)
代码 运行 适用于较小的数据集。随着数据集大小的增加,笛卡尔积花费了太多时间。对于 100 MB 的数据(rdd 的大小 "a"),它进行的数据洗牌读取了大约 25 GB。对于 3.5 GB 数据,以 TB 为单位。
我浏览了各种链接。像 spark 调整方法和一些堆栈溢出。但是大多数 post 都写成广播较小的 RDD。但是这里两个 rdd 的大小是一样的,而且很大。
我关注的链接:-
Spark: produce RDD[(X, X)] of all possible combinations from RDD[X] of-all-possible-combinations-from-rddx
我是 Spark 和 Scala 的新手。我无法想到这里是瓶颈的笛卡尔积。没有笛卡尔积是否可以解决这个问题。
由于 Cartesian product 是对 rdd 的昂贵操作,我试图通过使用 Spark MLib 中存在的 HashingTF 和 MinHashLSH 库来寻找 jaccard 相似度来解决上述问题。在问题中提到的rdd "a"中查找Jaccard相似度的步骤:
将rdd转换为dataframe
import sparkSession.implicits._
val dfA = a.toDF("id", "values")
借助 HashingTF 创建特征向量
val hashingTF = new HashingTF()
.setInputCol("values").setOutputCol("features").setNumFeatures(1048576)
特征变换
val featurizedData = hashingTF.transform(dfA) //Feature Transformation
正在创建 minHash table。 table的数值越多,越准确
结果会是这样,但是沟通成本和 运行 时间都很高。
val mh = new MinHashLSH()
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
近似相似性连接采用两个数据集和数据集中大约 returns 对行,其距离小于用户定义的阈值。近似相似连接既支持连接两个不同的数据集,也支持自连接。自连接会产生一些重复对。
val model = mh.fit(featurizedData)
//Approximately joining featurizedData with Jaccard distance smaller
//than 0.45
val dffilter = model.approxSimilarityJoin(featurizedData, featurizedData,
0.45)
因为在spark中,我们必须在我们的代码中进行手动优化,例如设置分区数,设置持久级别等。我也配置了这些参数。
- 将存储级别从 persist() 更改为 persist(StorageLevel.MEMORY_AND_DISK),
它帮助我消除了 OOM 错误。
- 同样在做join操作的时候,根据rdd重新分区了数据
尺寸。在 16.6 GB 的数据集上,在做简单的连接操作时,我使用了 200
划分。增加到600,也解决了我的OOM问题
PS:常量参数setNumFeatures(1048576)和setNumHashTables(3)是在16.6数据集上实验时配置的。您可以根据您的数据集增加或减少这些值。分区的数量也取决于您的数据集大小。通过这些优化,我得到了我想要的结果。
有用的链接:-
[https://spark.apache.org/docs/2.2.0/ml-features.html#locality-sensitive-hashing]
[https://eng.uber.com/lsh/]
[https://data-flair.training/blogs/limitations-of-apache-spark/]
我正在研究成对的 RDD。我的目标是计算 jaccard 相似度 根据我的 RDD 的 jaccard 相似度阈值 value.Structure 在 rdd 值集之间对它们进行聚类是:
val a= [Key,Set(String)] //Pair RDD
For example:-
India,[Country,Place,....]
USA,[Country,State,..]
Berlin,[City,Popluatedplace,..]
找到jaccard相似度后,我会将相似的实体聚为一类。在上面的示例中,印度和美国将根据某个阈值聚类到一个集群中,而柏林将在另一个集群中。
所以我取了rdd的笛卡尔积a
val filterOnjoin = a.cartesian(a).filter(f =>
(!f._1._1.toString().contentEquals(f._2._1.toString())))
//Cartesianproduct of rdd a and filtering rows with same key at both
//the position.
//e.g. ((India,Set[Country,Place,....]),(USA,Set[Country,State,..]))
并借助 jaccard 相似度比较这组值。
val Jsim = filterOnjoin.map(f => (f._1._1, (f._2._1,
Similarity.sim(f._1._2, f._2._2)))) //calculating jaccard similarity.
//(India,USA,0.8)
代码 运行 适用于较小的数据集。随着数据集大小的增加,笛卡尔积花费了太多时间。对于 100 MB 的数据(rdd 的大小 "a"),它进行的数据洗牌读取了大约 25 GB。对于 3.5 GB 数据,以 TB 为单位。
我浏览了各种链接。像 spark 调整方法和一些堆栈溢出。但是大多数 post 都写成广播较小的 RDD。但是这里两个 rdd 的大小是一样的,而且很大。
我关注的链接:-
Spark: produce RDD[(X, X)] of all possible combinations from RDD[X] of-all-possible-combinations-from-rddx
我是 Spark 和 Scala 的新手。我无法想到这里是瓶颈的笛卡尔积。没有笛卡尔积是否可以解决这个问题。
由于 Cartesian product 是对 rdd 的昂贵操作,我试图通过使用 Spark MLib 中存在的 HashingTF 和 MinHashLSH 库来寻找 jaccard 相似度来解决上述问题。在问题中提到的rdd "a"中查找Jaccard相似度的步骤:
将rdd转换为dataframe
import sparkSession.implicits._ val dfA = a.toDF("id", "values")
借助 HashingTF 创建特征向量
val hashingTF = new HashingTF() .setInputCol("values").setOutputCol("features").setNumFeatures(1048576)
特征变换
val featurizedData = hashingTF.transform(dfA) //Feature Transformation
正在创建 minHash table。 table的数值越多,越准确 结果会是这样,但是沟通成本和 运行 时间都很高。
val mh = new MinHashLSH() .setNumHashTables(3) .setInputCol("features") .setOutputCol("hashes")
近似相似性连接采用两个数据集和数据集中大约 returns 对行,其距离小于用户定义的阈值。近似相似连接既支持连接两个不同的数据集,也支持自连接。自连接会产生一些重复对。
val model = mh.fit(featurizedData) //Approximately joining featurizedData with Jaccard distance smaller //than 0.45 val dffilter = model.approxSimilarityJoin(featurizedData, featurizedData, 0.45)
因为在spark中,我们必须在我们的代码中进行手动优化,例如设置分区数,设置持久级别等。我也配置了这些参数。
- 将存储级别从 persist() 更改为 persist(StorageLevel.MEMORY_AND_DISK), 它帮助我消除了 OOM 错误。
- 同样在做join操作的时候,根据rdd重新分区了数据 尺寸。在 16.6 GB 的数据集上,在做简单的连接操作时,我使用了 200 划分。增加到600,也解决了我的OOM问题
PS:常量参数setNumFeatures(1048576)和setNumHashTables(3)是在16.6数据集上实验时配置的。您可以根据您的数据集增加或减少这些值。分区的数量也取决于您的数据集大小。通过这些优化,我得到了我想要的结果。
有用的链接:-
[https://spark.apache.org/docs/2.2.0/ml-features.html#locality-sensitive-hashing]
[https://eng.uber.com/lsh/]
[https://data-flair.training/blogs/limitations-of-apache-spark/]