Работник не может получить новую вставку данных в mysql в задании из фляги сельдерея

У меня есть три сервера, на которых запущена моя программа, и на каждом сервере есть восемь рабочих из сельдерея, чтобы получить задачу от Redis. То есть задача сельдерея каждого сервера может быть выполнена другим сервером.

На каждом сервере:зафиксировать изменения и вызвать задачу

    ...
    try:
        db.session.commit()
    except Exception as e:
        current_app.logger.error(str(e))
        db.session.rollback()
        if not ci_existed:  # only add
            self.delete(ci.ci_id)
        return abort(500, "add CI error")
    his_manager = CIAttributeHistoryManger()
    his_manager.add(ci.ci_id, histories)
    ci_cache.apply_async([ci.ci_id], queue="async")
    # add bj ci
    add_ci_bj.apply_async([ci_type.type_name, None, ci.ci_id], queue="async")
    return ci.ci_id

функция задачи

@celery.task(name="xxxxxxx", queue="async")
def add_ci_bj(ci_type, first_id, second_id):
    param, status = lib.ci.CIManager().get_relations(first_id, second_id, is_async=True)
    ...

функция в задаче

def get_relations(self, first_id, second_id, is_async=False):
    start = time.clock()
    try:
        second = self.get_ci_by_id(second_id, need_children=False)
    except Exception as e:
    return None, "get ci by id error: first %s, second %s, e %s, is_async:%s" % \
           (first_id, second_id, e, is_async)
    ...

Я вставляю данные и фиксирую в MySQL и вызываю задачу add_ci_bj, но я не могу получить данные по second_idЯ не могу понять, кто-нибудь может помочь?

1 ответ

Получив ответ, закройте сеанс перед вызовом на другой сервер. Такие как:

@celery.task(name="xxxxxxx", queue="async")
def add_ci_bj(ci_type, first_id, second_id):
    db.session.Close()
    param, status = lib.ci.CIManager().get_relations(first_id, second_id, is_async=True)
    ...
Другие вопросы по тегам