Параллельные вставки в суффикс-дереве

Некоторое время назад я опубликовал вопрос о сохранении / получении суффиксного дерева с диска. Это, наконец, работает нормально, но сейчас строительство идет очень медленно, и я не хочу сейчас связываться с алгоритмом Укконена (линейное построение).

Итак, я хотел сделать параллельные вставки, чтобы ускорить процесс, не делая дерево безопасным для потоков.

Дерево суффиксов хранит слова по своему начальному символу (посмотрите на изображение, опубликованное в моем предыдущем вопросе), таким образом, слово "банан" находится в дочернем элементе "B" корневого узла, а Apple - в дочернем элементе "A" и т. Д., Таким образом, вставка слова, начинающегося с "B", никогда не будет мешать вставке, начинающейся с "A". Моя идея состоит в том, чтобы иметь поток для каждого начального символа вставляемого набора слов: поток, вставляющий "А", другой поток, вставляющий "В", и т. Д.

Так что я думал о Executer класс, который просто добавляет слова в очередь слов каждогоProcess (сначала создайте его, если он не существует).

class Executer:
    #...
    def concurrent_insertion(word):
        k = word[0]
        processes.get(k, Process()).add(word)
    # ...

И класс Process это тот, который делает вставки. каждый Process экземпляр является независимым потоком, с Queue содержащие слова, которые еще нужно вставить.

В этом Process класс - это то, где у меня возникают проблемы, я думаю, это должно наследоваться от threading.ThreadПотому что каждый экземпляр должен быть потоком, но как мне сохранить его, пока не будет завершена вся обработка текста? Я имею в виду, он должен вставлять слова из его Queue слов, но когда Queue пустой поток не должен умирать, просто ждите, пока больше слов не заполнит Queue, "проснуться" и продолжить вставку.

1 ответ

Решение

Потоки не будут умирать, пока они не выйдут, так что вы можете сохранить их с помощью while True петля.

Обычная картина выглядит так:

q = Queue.Queue()             # word insertion queue
terminate = object()          # sentinel value to tell a thread to terminate

def worker(q):
    while True:
         word = q.get()       # block until next word is available
         if word is terminate:
             break
         insert_word(word)

После запуска рабочих и отправки слов в очередь основной поток должен дождаться завершения всей работы, а затем завершить работу рабочих.

for word in wordlist:
    q.put(word)
for i in range(numthreads):
    q.put(terminate)          # terminate all the worker threads
for t in threadlist:
    t.join()                  # wait for them all to finish

Альтернативный способ ожидания всей работы - использовать q.task_done а также q.join, Пример их использования показан в нижней части страницы в документах для модуля очереди.

Другие вопросы по тегам