Как использовать методы экземпляра в качестве заданий для APscheduler AND в сочетании с постоянным хранилищем данных [ошибка несвязанного метода]
[Примечание: использование Python 2.7 и среды ноутбука Jupyter]
Я хотел бы иметь возможность планировать методы экземпляра как задания, использующие APscheduler, и сохранять эти задания в постоянной БД [в данном случае mongodb].
Однако при попытке сделать это я столкнулся со следующей ошибкой: unbound method use_variable() must be called with Job instance as first argument (got NoneType instance instead)
До этого я успешно: (а) планирование методов экземпляров как заданий (б) сохранение заданий в mongodb
Однако я не могу заставить этих двоих работать вместе.
Что работает:
Базовый пример (а) планирования методов экземпляра как задания...
from apscheduler.schedulers.background import BackgroundScheduler
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(self):
print(self.variable)
job=Job({'variable': 'test'})
scheduler = BackgroundScheduler()
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()
Работает как положено, печатая "тест" каждые 5 секунд.
Что ломается:
Однако, как только я добавлю магазин вакансий...
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(self):
print(self.variable)
job=Job({'variable': 'test'})
jobstores = {'default': MongoDBJobStore(
database='apscheduler', collection='jobs')}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.add_job(job.use_variable, trigger='interval', seconds=5)
scheduler.start()
Задание успешно выполняется в первый раз, но при всех последующих вызовах возникают ошибки из-за "несвязанного метода". Я предполагаю, что экземпляр задания не извлекается из хранилища заданий, поэтому в метод use_variable не передается 'self'...
Из документов...
Из документов: In case of a bound method, passing the unbound version (YourClass.method_name) as the target function to add_job() with the class instance as the first argument (so it gets passed as the self argument)
Таким образом, я попытался:scheduler.add_job(Job.use_variable, args=[job] trigger='interval', seconds=5)
Не повезло тоже.
Текущее решение
В настоящее время я использую следующий обходной путь, однако он довольно хакерский, и я хотел бы найти более элегантное решение!
def proxy(job):
job.use_variable()
scheduler.add_job(proxy, args=[job], trigger='interval', seconds=5)
Обновление с планировщиком блокировки, как предложено Алексом
Минимальный пример кода:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
class Job:
def __init__(self, config):
self.variable = config['variable']
def use_variable(job):
print(job.variable)
job = Job({'variable': 'test'})
jobstores = {'default': MongoDBJobStore(
database='apscheduler', collection='jobs')}
scheduler = BlockingScheduler(jobstores=jobstores)
def my_listener(event):
if (event.exception):
print('Exception: {}'.format(event.exception))
scheduler.add_listener(my_listener)
scheduler.add_job(job.use_variable, trigger='interval',seconds=5)
// Also tried:
// scheduler.add_job(Job.use_variable, args=[job] trigger='interval',seconds=5)
scheduler.start()
Воспроизводит ошибку. Первый вызов успешно печатает "тест", но последующие вызовы сталкиваются с несвязанной ошибкой [по крайней мере, в среде ноутбука Jupyter]...
1 ответ
Я обнаружил ошибку в APScheduler 3.6.0, которая проявляется только в Python 2.7. Он пытается сделать правильные вещи в отношении планирования методов, но на py2.7 он делает это неправильно. Я исправлю это в следующем выпуске. В то же время, вы можете продолжать использовать ваш обходной путь.