sqlalchemy sqlite работает в многопоточной среде

Я сталкиваюсь с проблемой после настройки sqlalchemy ORM с sqlite в многопоточной среде. Вот что я пытаюсь достичь. Я пытаюсь подписаться на тему mqtt с помощью библиотеки Pyho mqtt Python. Класс mqttsubscribe работает в отдельном потоке. Класс MqttSubscribe вызывает функцию, которую я зарегистрировал при инициализации класса. Всякий раз, когда есть сообщение, я пытаюсь записать в базу данных с использованием models.py. Вот пример кода для models.py.

import os
import datetime
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.pool import StaticPool, NullPool
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, event, Column, Integer, String, DateTime, Boolean, ForeignKey, and_, desc

Base = declarative_base()
Session = sessionmaker()
session = ''

# Sqlite doesnt support foreign key by default
# Below code is logic is taken from
# https://stackru.com/questions/31794195/
# how-to-correctly-add-foreign-key-constraints-to-sqlite-db-using-sqlalchemy
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(connection, connection_record):
    cursor = connection.cursor()
    cursor.execute("PRAGMA foreign_keys=ON")
    cursor.close()

def init_db_connection(engine, echo=True):
    global session
    engine = create_engine(engine, echo=echo)
    Session.configure(bind=engine, connect_args={'check_same_thread':False},
        poolclass=NullPool)
    session = Session()
    Base.metadata.create_all(engine)
    return session

class Person(Base):
    __tablename__ = 'Person'

    id = Column(Integer, primary_key=True)
    first_name = Column(String(50))
    last_name = Column(String(50))
    created_on = Column(DateTime, nullable=False, default=datetime.datetime.utcnow())
    updated_on = Column(DateTime, nullable=False, default=datetime.datetime.utcnow(),\
        onupdate=datetime.datetime.utcnow())

    def __init__(self, first_name, last_name):
        self.last_name = last_name
        self.first_name = first_name


    @classmethod
    def create(cls, first_name, last_name):
        result = cls(first_name, last_name)
        session.add(result)
        session.commit()
        return result

    def __repr__(self):
        return "<Person (name=%s last_name=%s)>" % (self.first_name, self.last_name)

Я пытаюсь инициализировать db_connnection с помощью этого метода в другой библиотеке путем импорта model.py и init_db_connection

init_db_connection('sqlite:////tmp/foo.db', echo=False)
current_time = datetime.datetime.utcnow()
time.sleep(1)
print('Current_time: {}'.format(current_time))
person = Person.create('Andy','Boss')
print('Person name: {}'.format(person.first_name))

Функция обратного вызова может записывать в базу данных из другого потока. Я столкнулся с проблемой при чтении базы данных. Я нажимаю на эту ошибку

self = <sqlalchemy.orm.session.SessionTransaction object at 0x756550b0>, prepared_ok = False, rollback_ok = False, deactive_ok = False
closed_msg = 'This transaction is closed'

    def _assert_active(self, prepared_ok=False,
                       rollback_ok=False,
                       deactive_ok=False,
                       closed_msg="This transaction is closed"):
        if self._state is COMMITTED:
            raise sa_exc.InvalidRequestError(
                "This session is in 'committed' state; no further "
                "SQL can be emitted within this transaction."
            )
        elif self._state is PREPARED:
            if not prepared_ok:
                raise sa_exc.InvalidRequestError(
>                   "This session is in 'prepared' state; no further "
                    "SQL can be emitted within this transaction."
E                   sqlalchemy.exc.InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction.

sanity/lib/python3.5/site-packages/sqlalchemy/orm/session.py:264: InvalidRequestError

Должен ли я использовать scoped_session вместо сессии, чтобы преодолеть эту проблему? Я не знаю, является ли sqlite подходящей базой данных для такой среды.

0 ответов

Да, вы должны использовать scoped_session, но это не единственный способ.

Ваш исходный код использует один и тот же объект сеанса в разных потоках. Это неправильно Объект сеанса не является потокобезопасным.

Существует два способа использования сеансов кросс-потоков.

  1. Создайте новый сеанс, когда вам нужно поговорить с базой данных, а не просто использовать глобальный. Создание менеджера контекста сделает этот метод более удобным.

  2. Используйте сессию с областью действия как глобальный объект.

Недавно я написал заметку, чтобы проанализировать преимущества и недостатки этих двух моделей использования. Я думаю, что это было бы полезно, и я приведу приведенную ниже аннотацию на случай, если вам понадобится д-р.

  1. Использование диспетчера контекста для открытия нового сеанса, когда вам нужно поговорить с базой данных, что, безусловно, безопасно и гибко, но не так удобно;

  2. Использование scoped_session в качестве глобального объекта, что удобно, но не так безопасно и гибко

Другие вопросы по тегам