Как я могу ускорить мой алгоритм Aho-Corasick?
Я пытаюсь решить проблему на HackerRank; "Определение здоровья ДНК". После некоторых обсуждений я решил, что алгоритм Ахо-Корасика будет лучшим выбором. Проблема заключается в поиске строки для различных последовательностей со связанным значением. Задача состоит в том, чтобы взять подраздел этих пар значений последовательности из заданного списка и найти значение, связанное с входной строкой. Это должно быть выполнено 44850 раз со списком из 100000 пар значений последовательности. Я реализовал алгоритм, и хотя он намного быстрее, чем моя первая попытка, он все еще недостаточно быстр, чтобы пройти этот тестовый пример. Вот моя реализация:
Построение дерева:
def createValueTrie(gs: Array[(String, Int)]): TrieNodeWithVal = {
def recurse(genes: Array[(String, Int)]): Map[Char, TrieNodeWithVal] = {
genes
.groupBy(_._1.head)
.map(x => (x._1, x._2.map(y => (y._1.tail, y._2))))
.map{
case (c, arr: Array[(String, Int)]) => {
val value = arr.filter(_._1.length == 0).foldLeft(0)(_ + _._2)
val filtered = arr.filter(_._1.length > 0)
val recursed = recurse(filtered)
(c, new TrieNodeWithVal(arr.exists(_._1.length == 0), recursed, value))
}
}
}
new TrieNodeWithVal(false, recurse(gs), 0)
}
Поиск по дереву:
def findValueMatches(trie: TrieNodeWithVal, sequence: String): Iterator[(String, Long)] = {
sequence.scanRight("")(_ + _).dropRight(1).iterator.flatMap(s => {
Iterator.iterate[(Iterator[Char], Option[TrieNodeWithVal])]((s.iterator, Some(trie))) {
case (it: Iterator[Char], Some(node)) => if (it.hasNext) (it, node(it.next())) else (it, None)
case (it: Iterator[Char], None) => (it, None)
}.takeWhile {
case (_, Some(_)) => true
case _ => false
}.map {
case (_, Some(node)) => node
}.zipWithIndex.withFilter {
case (node, _) => node isWord
}.map {
case (node, i) => (s.slice(0, i), node.value)
}
})
}
Классы узлов Trie:
class TrieNode(isAWord: Boolean, childs: Map[Char, TrieNode]) {
val isWord = isAWord
val children: Map[Char, TrieNode] = childs
def apply(c: Char): Option[TrieNode] = children.get(c)
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
}
class TrieNodeWithVal(isAWord: Boolean, childs: Map[Char, TrieNodeWithVal], valu: Long) extends TrieNode(isAWord, childs) {
val value = valu
override val children: Map[Char, TrieNodeWithVal] = childs
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper + "[" + x._2.value + "]" else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
override def apply(c: Char): Option[TrieNodeWithVal] = children.get(c)
}
Я знаю, что для случаев сбоев можно сделать еще кое-что, но несколько человек в дискуссии сказали, что это будет медленнее, так как дерево должно быть перестроено для каждого запроса. Есть ли более эффективные коллекции, которые я должен использовать для решения этой проблемы? Как я могу ускорить это, поддерживая чисто функциональный стиль?
3 ответа
Есть различные изменения, некоторые могут повлиять на производительность, а другие просто косметические.
В recurse
Вы можете объединить два map
звонки и использование partition
чтобы уменьшить количество тестов массива:
def recurse(genes: Array[(String, Int)]): Map[Char, TrieNodeWithVal] = {
genes
.groupBy(_._1.head)
.map { x =>
val c = x._1
val arr = x._2.map(y => (y._1.tail, y._2))
val (filtered, nonFiltered) = arr.partition(_._1.nonEmpty)
val value = nonFiltered.foldLeft(0)(_ + _._2)
val recursed = recurse(filtered)
(c, new TrieNodeWithVal(nonFiltered.nonEmpty, recursed, value))
}
}
Вы можете упростить findValueMatches
используя условия на case
операторы и комбинирующие некоторые операции:
def findValueMatches(trie: TrieNodeWithVal, sequence: String): Iterator[(String, Long)] = {
sequence.scanRight("")(_ + _).dropRight(1).iterator.flatMap(s => {
Iterator.iterate[(Iterator[Char], Option[TrieNodeWithVal])]((s.iterator, Some(trie))) {
case (it: Iterator[Char], Some(node)) if it.hasNext => (it, node(it.next()))
case (it: Iterator[Char], _) => (it, None)
}.takeWhile {
_._2.nonEmpty
}.zipWithIndex.collect {
case ((_, Some(node)), i) if node.isWord =>
(s.slice(0, i), node.value)
}
})
}
Наконец, вы можете упростить свои занятия с помощью val
параметры
class TrieNode(val isWord: Boolean, val children: Map[Char, TrieNode]) {
def apply(c: Char): Option[TrieNode] = children.get(c)
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
}
class TrieNodeWithVal(isAWord: Boolean, childs: Map[Char, TrieNodeWithVal], val value: Long) extends TrieNode(isAWord, childs) {
override val children: Map[Char, TrieNodeWithVal] = childs
override def toString(): String = "(" + children.map(x => (if (x._2.isWord) x._1.toUpper + "[" + x._2.value + "]" else x._1) + ": " + x._2.toString()).mkString(", ") + ")"
override def apply(c: Char): Option[TrieNodeWithVal] = children.get(c)
}
Все это скомпилировано, но не проверено, поэтому извиняюсь, если я случайно изменил алгоритм.
Я не ускорил алгоритм, но решил, что если я присваиваю каждому узлу индекс из исходного списка последовательностей и значений, то вместо того, чтобы каждый раз перестраивать попытки, я могу просто использовать один и подсчитывать только те узлы, которые иметь индекс в диапазоне. Это улучшило время с 8 минут до 11 секунд!
Вы можете попробовать алгоритм с троичным. Моя реализация PHP: https://github.com/Tetramatrix/phpahocorasick.