django, несколько баз данных (писатель, чтение-реплоика) и проблема с синхронизацией
Итак ... в ответ на вызов API я делаю:
i = CertainObject(paramA=1, paramB=2)
i.save()
теперь в моей базе данных писателей появилась новая запись.
Обработка может занять немного времени, и я не хочу откладывать ответ на вызывающий API, поэтому в следующей строке я передаю идентификатор объекта асинхронному заданию с использованием Celery:
run_async_job.delay(i.id)
сразу или через несколько секунд в зависимости от очереди
Есть ли закономерность, гарантирующая успех и отсутствие необходимости «спать» несколько секунд перед чтением или надеждой на удачу?
Спасибо.
4 ответа
Поскольку запись и немедленная загрузка - это высокий приоритет, почему бы не сохранить его в базе данных на основе памяти, такой как Memcache или Redis. Так что через некоторое время вы можете записать его в базу данных, используя периодическое задание в сельдерее, которое будет выполняться, скажем, каждую минуту или около того. Когда запись в БД завершится, ключи будут удалены из Redis / Memcache.
Вы можете хранить данные в БД на основе памяти в течение определенного времени, скажем, 1 час, когда данные нужны больше всего. Также вы можете создать сервисный метод, который будет проверять, находятся ли данные в памяти или нет.
Django Redis - отличный пакет для подключения к redis(если вы используете его в качестве брокера в Celery).
Я привожу пример на основе кеша Django:
# service method
from django.core.cache import cache
def get_object(obj_id, model_cls):
obj_dict = cache.get(obj_id, None) # checks if obj id is in cache, O(1) complexity
if obj_dict:
return model_cls(**obj_dict)
else:
return model_cls.objects.get(id=obj_id)
# celery job
@app.task
def store_objects():
logger.info("-"*25)
# you can use .bulk_create() to reduce DB hits and faster DB entries
for obj_id in cache.keys("foo_*"):
CertainObject.objects.create(**cache.get(obj_id))
cache.delete(obj_id)
logger.info("-"*25)
Самый простой способ - это использовать повторные попытки, упомянутые Грегом и Элрондом в их ответах. Если вы используете декораторы shared_task или @app.task, вы можете использовать следующий фрагмент кода.
@shared_task(bind=True)
def your_task(self, certain_object_id):
try:
certain_obj = CertainObject.objects.get(id=certain_object_id)
# Do your stuff
except CertainObject.DoesNotExist as e:
self.retry(exc=e, countdown=2 ** self.request.retries, max_retries=20)
Я использовал экспоненциальный обратный отсчет между каждой попыткой. Вы можете изменить его в соответствии с вашими потребностями.
Вы можете найти документацию по настраиваемой задержке повтора здесь . По этой ссылке также есть еще один документ, объясняющий экспоненциальный откат.
Когда вы вызываете retry, он отправит новое сообщение, используя тот же идентификатор задачи, и позаботится о том, чтобы сообщение было доставлено в ту же очередь, что и исходная задача. Подробнее об этом можно прочитать в документации здесь.
Самое простое решение - поймать любую
DoesNotExist
ошибки, возникающие в начале задачи, затем запланируйте повторную попытку. Это можно сделать, преобразовав
run_async_job
в
<tcode id="4288253"></tcode>:
@app.task(bind=True)
def run_async_job(self, object_id):
try:
instance = CertainObject.objects.get(id=object_id)
except CertainObject.DoesNotExist:
return self.retry(object_id)
В этой статье довольно подробно рассказывается, как решить проблемы чтения после записи в реплицированных базах данных: https://medium.com/box-tech-blog/how-we-learned-to-stop-worrying-and-read- из-реплик-58cc43973638.
Как и автор, я не знаю надежного универсального способа справиться с несогласованностью чтения после записи.
Основная стратегия, которую я использовал раньше, - это
expect_and_get(pk, max_attempts=10, delay_seconds=5)
метод, который пытается получить запись, и пытается ее
max_attempts
раз, задерживая
delay_seconds
секунд между попытками. Идея состоит в том, что он «ожидает» существования записи и поэтому рассматривает определенное количество сбоев как временные проблемы с базой данных. Это немного более надежно, чем просто спать в течение некоторого времени, поскольку он быстрее собирает записи и, надеюсь, намного реже задерживает выполнение задания.
Еще одна стратегия - отложить возвращение из специального
save_to_read
до тех пор, пока реплики чтения не получат значение, либо путем синхронной передачи нового значения репликам чтения, либо просто опрашивая их все, пока они не вернут запись. Этот способ кажется немного более хакерским, ИМО.
Для многих операций чтения вам, вероятно, не нужно беспокоиться о согласованности чтения после записи:
Если мы отображаем имя предприятия, частью которого является пользователь, на самом деле это не так уж и важно, если в невероятно редком случае, когда администратор меняет его, требуется минута, чтобы изменение распространялось на пользователей предприятия.