Сохранить очередь Python в файл
Я использую класс Python Queue для управления списком задач, которые совместно используются несколькими рабочими потоками. Фактический код огромен, и я все еще делаю его полностью без ошибок. Время от времени рабочие потоки вылетают, и мне приходится перезапускать всю процедуру. В процессе я теряю все задачи, которые были поставлены в очередь. Есть ли способ сохранить очередь в файл, чтобы при каждом перезапуске процесса список задач предварительно загружался из этого файла?
На первый взгляд кажется, что когда я получаю или ставлю задачи в очередь, я должен одновременно читать и писать в файл. Однако это не дает мне функциональности queue.task_done() и может быть не самым оптимальным решением. Любые идеи очень приветствуются.
5 ответов
Есть несколько подходов к этому, в том числе pickle
модуль...
Но, на мой взгляд, было бы проще просто записать в файл, строку за строкой, каждый элемент очереди в столбцах, содержащих другие свойства, которые вы можете сохранить, например task_done
,
пример:
element1, True
element2, False
...
В Python очень легко читать файл, отформатированный так:
for line in file('path/file.ext'):
name, state = line.split(sep_char)
#and them insert into the queue...
Реализуйте механизм рукопожатия между работником и мастером.
У мастера есть список задач, прежде чем поместить их в очередь, протолкнуть список в файл. Затем вставьте задачу в очередь. Когда рабочий сделан, он отправляет обратно сообщение ACK. Только в этот момент откройте список задач и удалите соответствующий идентификатор.
Самый простой способ сделать это - использовать AMQP для очередей сообщений и позволить брокеру сообщений позаботиться о них. Я реализовал аналогичную систему, используя RabbitMQ в качестве посредника сообщений с устойчивыми постоянными очередями. Сообщения даже пережили сбой серверного программного обеспечения RabbitMQ, когда я использовал устаревшую версию сервера 1.72 на виртуальном сервере Linux с только 512 МБ ОЗУ и около миллиона сообщений в игре.
Я делаю так, что каждый тип работника потребляет сообщения из другой очереди. Если мне нужно более одного работника этого типа, то очередь сообщений автоматически переходит в циклический цикл, и если работник не может завершить обработку сообщения, он просто не подтверждает его, и он возвращается в очередь.
Я написал небольшой модуль shim с примерно 80 строками кода, чтобы сидеть перед kombu
, а потом переписал что использовать py-amqplib
, Если бы я знал о haigha
ранее я бы использовал это, так как это очень близко соответствует документу спецификаций AMQP.
Я не рекомендую Kombu, потому что он настолько сложен для отладки и отличается от стандарта AMQP странными способами. Посмотри на haigha
потому что, хотя документация представляет собой не более одного примера фрагмента кода на PyPi, она лучше документирована, чем kombu или amqplib, потому что вы можете использовать спецификации AMQP в качестве документации haigha.
Простой вариант, который я могу предложить, - это обернуть таблицу базы данных в класс и использовать ее в качестве своей очереди. Столбец с автоинкрементом будет творить чудеса (следующий элемент, который нужно удалить, это элемент с наименьшим идентификатором).
class dbQueue:
init():
# Pick some random id for this run (or set it to some thing you know).
put():
# Insert entry into table
get():
# The update .. select combo removes the need for a database that has transactions.
# If no entries bear your ID:
# Update the next entry that is not already marked with your ID.
# Select the entry that matches your ID and return it.
task_done():
# Delete the entry with your ID.
Это не будет иметь лучшей производительности в зависимости от того, как часто обновляется очередь, даже база данных sqlite в памяти не будет такой же быстрой, как структура связанного списка. С другой стороны, вы можете просматривать базу данных с помощью любого инструмента, который может получить доступ к базе данных, чтобы вы могли видеть, какой из них находится в процессе.