Spark 上的动态集代数
Dynamic Set Algebra on Spark
考虑以下问题。给定:
- 集合集
- 动态接收的布尔表达式
Return 结果集。
Spark 是否有任何有效的算法或库来解决这个普遍问题?
这里有一个 玩具 示例来从概念上说明问题:
val X = Set("A1", "A2", "A3", "A4")
val Y = Set("A2", "A4", "A5")
val collection = Set(X, Y)
val expression = "X and Y"
我正在寻找一种实现通用 solve_expression
的方法,以便在上面的示例中:
output = solve_expression(expression, collection)
结果:
Set("A2", "A5")
我正在处理包含数百万项的集合,以及以字符串形式出现的布尔表达式。重要的是表达式中的每个原子(例如上面的 "X" 和 "Y")都是集合。表达式和集合是 dynamic(操作不能硬编码,因为我们将它们作为输入接收并且我们事先不知道它们是什么)。
我对问题的表述很灵活。实际集合可以是 Set
类型,例如保存字符串(例如 "A1"、"A2")、编码为二进制向量或任何其他适合 Spark 的内容。
Spark 是否有任何库来解析 和解决 集合上的一般布尔表达式?
好的。假设您想在 Spark 中执行此操作。此外,由于这些是巨大的集合,我们假设它们不在内存中,它们每个都在一个文件中 - 文件中的每一行表示集合中的一个条目。
我们将用 RDD
s 表示集合 - Spark 存储数据的标准方式。
使用此解析器(改编自 here)
import scala.util.parsing.combinator.JavaTokenParsers
import org.apache.spark.rdd.RDD
case class Query[T](setMap: Map[String, RDD[T]]) extends JavaTokenParsers {
private lazy val expr: Parser[RDD[T]]
= term ~ rep("union" ~ term) ^^ { case f1 ~ fs => (f1 /: fs)(_ union _._2) }
private lazy val term: Parser[RDD[T]]
= fact ~ rep("inter" ~ fact) ^^ { case f1 ~ fs => (f1 /: fs)(_ intersection _._2) }
private lazy val fact: Parser[RDD[T]]
= vari | ("(" ~ expr ~ ")" ^^ { case "(" ~ exp ~ ")" => exp })
private lazy val vari: Parser[RDD[T]]
= setMap.keysIterator.map(Parser(_)).reduceLeft(_ | _) ^^ setMap
def apply(expression: String) = this.parseAll(expr, expression).get.distinct
}
将以上内容粘贴到 shell 后观察以下 spark-shell
交互(为简洁起见,我省略了一些回复):
> val x = sc.textFile("X.txt").cache \ contains "1\n2\n3\n4\n5"
> val y = sc.textFile("Y.txt").cache \ contains "3\n4\n5\n6\n7"
> val z = sc.textFile("Z.txt").cache \ contains "3\n9\n"
> val sets = Map("x" -> x, "y" -> y, "z" -> z)
> val query = Query[Int](sets)
现在,我可以用不同的表达式调用查询。请注意,我在这里使用 collect
来触发评估(因此我们可以看到集合中的内容),但如果集合非常大,您通常只需保持 RDD
不变(并保存它到磁盘)。
> query("a union b").collect
res: Array[Int] = Array("1", "2", "3", "4", "5", "6", "7")
> query("a inter b").collect
res: Array[Int] = Array("3", "4", "5")
> query("a inter b union ((a inter b) union a)").collect
res: Array[Int] = Array("1", "2", "3", "4", "5")
> query("c union a inter b").collect
res: Array[Int] = Array("3", "4", "5", "9", "10")
> query("(c union a) inter b").collect
res: Array[Int] = Array("3", "4", "5")
虽然懒得去实现,但是set difference应该是一行加法(很像union
和inter
)。我认为集合补集是个坏主意......它们并不总是有意义(什么是空集的补集,你如何表示它?)。
考虑以下问题。给定:
- 集合集
- 动态接收的布尔表达式
Return 结果集。
Spark 是否有任何有效的算法或库来解决这个普遍问题?
这里有一个 玩具 示例来从概念上说明问题:
val X = Set("A1", "A2", "A3", "A4")
val Y = Set("A2", "A4", "A5")
val collection = Set(X, Y)
val expression = "X and Y"
我正在寻找一种实现通用 solve_expression
的方法,以便在上面的示例中:
output = solve_expression(expression, collection)
结果:
Set("A2", "A5")
我正在处理包含数百万项的集合,以及以字符串形式出现的布尔表达式。重要的是表达式中的每个原子(例如上面的 "X" 和 "Y")都是集合。表达式和集合是 dynamic(操作不能硬编码,因为我们将它们作为输入接收并且我们事先不知道它们是什么)。
我对问题的表述很灵活。实际集合可以是 Set
类型,例如保存字符串(例如 "A1"、"A2")、编码为二进制向量或任何其他适合 Spark 的内容。
Spark 是否有任何库来解析 和解决 集合上的一般布尔表达式?
好的。假设您想在 Spark 中执行此操作。此外,由于这些是巨大的集合,我们假设它们不在内存中,它们每个都在一个文件中 - 文件中的每一行表示集合中的一个条目。
我们将用 RDD
s 表示集合 - Spark 存储数据的标准方式。
使用此解析器(改编自 here)
import scala.util.parsing.combinator.JavaTokenParsers
import org.apache.spark.rdd.RDD
case class Query[T](setMap: Map[String, RDD[T]]) extends JavaTokenParsers {
private lazy val expr: Parser[RDD[T]]
= term ~ rep("union" ~ term) ^^ { case f1 ~ fs => (f1 /: fs)(_ union _._2) }
private lazy val term: Parser[RDD[T]]
= fact ~ rep("inter" ~ fact) ^^ { case f1 ~ fs => (f1 /: fs)(_ intersection _._2) }
private lazy val fact: Parser[RDD[T]]
= vari | ("(" ~ expr ~ ")" ^^ { case "(" ~ exp ~ ")" => exp })
private lazy val vari: Parser[RDD[T]]
= setMap.keysIterator.map(Parser(_)).reduceLeft(_ | _) ^^ setMap
def apply(expression: String) = this.parseAll(expr, expression).get.distinct
}
将以上内容粘贴到 shell 后观察以下 spark-shell
交互(为简洁起见,我省略了一些回复):
> val x = sc.textFile("X.txt").cache \ contains "1\n2\n3\n4\n5"
> val y = sc.textFile("Y.txt").cache \ contains "3\n4\n5\n6\n7"
> val z = sc.textFile("Z.txt").cache \ contains "3\n9\n"
> val sets = Map("x" -> x, "y" -> y, "z" -> z)
> val query = Query[Int](sets)
现在,我可以用不同的表达式调用查询。请注意,我在这里使用 collect
来触发评估(因此我们可以看到集合中的内容),但如果集合非常大,您通常只需保持 RDD
不变(并保存它到磁盘)。
> query("a union b").collect
res: Array[Int] = Array("1", "2", "3", "4", "5", "6", "7")
> query("a inter b").collect
res: Array[Int] = Array("3", "4", "5")
> query("a inter b union ((a inter b) union a)").collect
res: Array[Int] = Array("1", "2", "3", "4", "5")
> query("c union a inter b").collect
res: Array[Int] = Array("3", "4", "5", "9", "10")
> query("(c union a) inter b").collect
res: Array[Int] = Array("3", "4", "5")
虽然懒得去实现,但是set difference应该是一行加法(很像union
和inter
)。我认为集合补集是个坏主意......它们并不总是有意义(什么是空集的补集,你如何表示它?)。