我怎样才能加快我的 Aho-Corasick 算法?
How can I speed up my Aho-Corasick Algorithm?
我正在尝试解决 HackerRank 上的一个问题; "Determining DNA Health." 在查看了一些讨论后,我决定 Aho-Corasick 算法是最佳选择。该问题涉及在字符串中搜索具有关联值的各种序列。任务是从给定列表中取出这些序列值对的一部分,并找到与输入字符串关联的值。这意味着要用 100000 个序列值对的列表完成 44850 次。我已经实现了这个算法,虽然它比我第一次尝试快了很多,但它仍然不够快,无法通过这个测试用例。这是我的实现:
构建 trie:
def createValueTrie(gs: Array[(String, Int)]): TrieNodeWithVal = {
def recurse(genes: Array[(String, Int)]): Map[Char, TrieNodeWithVal] = {
genes
.groupBy(_._1.head)
.map(x => (x._1, x._2.map(y => (y._1.tail, y._2))))
.map{
case (c, arr: Array[(String, Int)]) => {
val value = arr.filter(_._1.length == 0).foldLeft(0)(_ + _._2)
val filtered = arr.filter(_._1.length > 0)
val recursed = recurse(filtered)
(c, new TrieNodeWithVal(arr.exists(_._1.length == 0), recursed, value))
}
}
}
new TrieNodeWithVal(false, recurse(gs), 0)
}
正在搜索 trie:
def findValueMatches(trie: TrieNodeWithVal, sequence: String): Iterator[(String, Long)] = {
sequence.scanRight("")(_ + _).dropRight(1).iterator.flatMap(s => {
Iterator.iterate[(Iterator[Char], Option[TrieNodeWithVal])]((s.iterator, Some(trie))) {
case (it: Iterator[Char], Some(node)) => if (it.hasNext) (it, node(it.next())) else (it, None)
case (it: Iterator[Char], None) => (it, None)
}.takeWhile {
case (_, Some(_)) => true
case _ => false
}.map {
case (_, Some(node)) => node
}.zipWithIndex.withFilter {
case (node, _) => node isWord
}.map {
case (node, i) => (s.slice(0, i), node.value)
}
})
}
Trie 节点 类:
class TrieNode(isAWord: Boolean, childs: Map[Char, TrieNode]) {
val isWord = isAWord
val children: Map[Char, TrieNode] = childs
def apply(c: Char): Option[TrieNode] = children.get(c)
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
}
class TrieNodeWithVal(isAWord: Boolean, childs: Map[Char, TrieNodeWithVal], valu: Long) extends TrieNode(isAWord, childs) {
val value = valu
override val children: Map[Char, TrieNodeWithVal] = childs
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper + "[" + x._2.value + "]" else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
override def apply(c: Char): Option[TrieNodeWithVal] = children.get(c)
}
我知道这里可以为失败案例做更多的边缘构建,但是讨论中的几个人说它会更慢,因为需要为每个查询重建 trie。对于这样的问题,我应该使用一些更有效的集合吗?如何在保持纯函数式风格的同时加快速度?
有各种变化,有些可能会影响性能,有些只是装饰性的。
在recurse
中你可以合并两个map
调用并使用partition
来减少你测试数组的次数:
def recurse(genes: Array[(String, Int)]): Map[Char, TrieNodeWithVal] = {
genes
.groupBy(_._1.head)
.map { x =>
val c = x._1
val arr = x._2.map(y => (y._1.tail, y._2))
val (filtered, nonFiltered) = arr.partition(_._1.nonEmpty)
val value = nonFiltered.foldLeft(0)(_ + _._2)
val recursed = recurse(filtered)
(c, new TrieNodeWithVal(nonFiltered.nonEmpty, recursed, value))
}
}
您可以通过在 case
语句上使用条件并结合一些操作来简化 findValueMatches
:
def findValueMatches(trie: TrieNodeWithVal, sequence: String): Iterator[(String, Long)] = {
sequence.scanRight("")(_ + _).dropRight(1).iterator.flatMap(s => {
Iterator.iterate[(Iterator[Char], Option[TrieNodeWithVal])]((s.iterator, Some(trie))) {
case (it: Iterator[Char], Some(node)) if it.hasNext => (it, node(it.next()))
case (it: Iterator[Char], _) => (it, None)
}.takeWhile {
_._2.nonEmpty
}.zipWithIndex.collect {
case ((_, Some(node)), i) if node.isWord =>
(s.slice(0, i), node.value)
}
})
}
最后,您可以使用 val
参数简化 类
class TrieNode(val isWord: Boolean, val children: Map[Char, TrieNode]) {
def apply(c: Char): Option[TrieNode] = children.get(c)
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
}
class TrieNodeWithVal(isAWord: Boolean, childs: Map[Char, TrieNodeWithVal], val value: Long) extends TrieNode(isAWord, childs) {
override val children: Map[Char, TrieNodeWithVal] = childs
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper + "[" + x._2.value + "]" else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
override def apply(c: Char): Option[TrieNodeWithVal] = children.get(c)
}
所有内容均已编译但未经测试,如果我无意中更改了算法,请见谅。
我没有加速算法,但我想如果我给每个节点一个来自原始序列和值列表的索引,那么我不必每次都重新构建尝试,我可以只使用一个和只计算在范围内有索引的节点。这将时间从 8 分钟缩短到 11 秒!
您可以使用三元树尝试该算法。我的 php 实现:https://github.com/Tetramatrix/phpahocorasick.
我正在尝试解决 HackerRank 上的一个问题; "Determining DNA Health." 在查看了一些讨论后,我决定 Aho-Corasick 算法是最佳选择。该问题涉及在字符串中搜索具有关联值的各种序列。任务是从给定列表中取出这些序列值对的一部分,并找到与输入字符串关联的值。这意味着要用 100000 个序列值对的列表完成 44850 次。我已经实现了这个算法,虽然它比我第一次尝试快了很多,但它仍然不够快,无法通过这个测试用例。这是我的实现:
构建 trie:
def createValueTrie(gs: Array[(String, Int)]): TrieNodeWithVal = {
def recurse(genes: Array[(String, Int)]): Map[Char, TrieNodeWithVal] = {
genes
.groupBy(_._1.head)
.map(x => (x._1, x._2.map(y => (y._1.tail, y._2))))
.map{
case (c, arr: Array[(String, Int)]) => {
val value = arr.filter(_._1.length == 0).foldLeft(0)(_ + _._2)
val filtered = arr.filter(_._1.length > 0)
val recursed = recurse(filtered)
(c, new TrieNodeWithVal(arr.exists(_._1.length == 0), recursed, value))
}
}
}
new TrieNodeWithVal(false, recurse(gs), 0)
}
正在搜索 trie:
def findValueMatches(trie: TrieNodeWithVal, sequence: String): Iterator[(String, Long)] = {
sequence.scanRight("")(_ + _).dropRight(1).iterator.flatMap(s => {
Iterator.iterate[(Iterator[Char], Option[TrieNodeWithVal])]((s.iterator, Some(trie))) {
case (it: Iterator[Char], Some(node)) => if (it.hasNext) (it, node(it.next())) else (it, None)
case (it: Iterator[Char], None) => (it, None)
}.takeWhile {
case (_, Some(_)) => true
case _ => false
}.map {
case (_, Some(node)) => node
}.zipWithIndex.withFilter {
case (node, _) => node isWord
}.map {
case (node, i) => (s.slice(0, i), node.value)
}
})
}
Trie 节点 类:
class TrieNode(isAWord: Boolean, childs: Map[Char, TrieNode]) {
val isWord = isAWord
val children: Map[Char, TrieNode] = childs
def apply(c: Char): Option[TrieNode] = children.get(c)
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
}
class TrieNodeWithVal(isAWord: Boolean, childs: Map[Char, TrieNodeWithVal], valu: Long) extends TrieNode(isAWord, childs) {
val value = valu
override val children: Map[Char, TrieNodeWithVal] = childs
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper + "[" + x._2.value + "]" else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
override def apply(c: Char): Option[TrieNodeWithVal] = children.get(c)
}
我知道这里可以为失败案例做更多的边缘构建,但是讨论中的几个人说它会更慢,因为需要为每个查询重建 trie。对于这样的问题,我应该使用一些更有效的集合吗?如何在保持纯函数式风格的同时加快速度?
有各种变化,有些可能会影响性能,有些只是装饰性的。
在recurse
中你可以合并两个map
调用并使用partition
来减少你测试数组的次数:
def recurse(genes: Array[(String, Int)]): Map[Char, TrieNodeWithVal] = {
genes
.groupBy(_._1.head)
.map { x =>
val c = x._1
val arr = x._2.map(y => (y._1.tail, y._2))
val (filtered, nonFiltered) = arr.partition(_._1.nonEmpty)
val value = nonFiltered.foldLeft(0)(_ + _._2)
val recursed = recurse(filtered)
(c, new TrieNodeWithVal(nonFiltered.nonEmpty, recursed, value))
}
}
您可以通过在 case
语句上使用条件并结合一些操作来简化 findValueMatches
:
def findValueMatches(trie: TrieNodeWithVal, sequence: String): Iterator[(String, Long)] = {
sequence.scanRight("")(_ + _).dropRight(1).iterator.flatMap(s => {
Iterator.iterate[(Iterator[Char], Option[TrieNodeWithVal])]((s.iterator, Some(trie))) {
case (it: Iterator[Char], Some(node)) if it.hasNext => (it, node(it.next()))
case (it: Iterator[Char], _) => (it, None)
}.takeWhile {
_._2.nonEmpty
}.zipWithIndex.collect {
case ((_, Some(node)), i) if node.isWord =>
(s.slice(0, i), node.value)
}
})
}
最后,您可以使用 val
参数简化 类
class TrieNode(val isWord: Boolean, val children: Map[Char, TrieNode]) {
def apply(c: Char): Option[TrieNode] = children.get(c)
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
}
class TrieNodeWithVal(isAWord: Boolean, childs: Map[Char, TrieNodeWithVal], val value: Long) extends TrieNode(isAWord, childs) {
override val children: Map[Char, TrieNodeWithVal] = childs
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper + "[" + x._2.value + "]" else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
override def apply(c: Char): Option[TrieNodeWithVal] = children.get(c)
}
所有内容均已编译但未经测试,如果我无意中更改了算法,请见谅。
我没有加速算法,但我想如果我给每个节点一个来自原始序列和值列表的索引,那么我不必每次都重新构建尝试,我可以只使用一个和只计算在范围内有索引的节点。这将时间从 8 分钟缩短到 11 秒!
您可以使用三元树尝试该算法。我的 php 实现:https://github.com/Tetramatrix/phpahocorasick.