Как реализовать asyncpg транзакции декоратор?

Я попытался реализовать оболочку транзакции asyncpg, как показано ниже:

def asyncpg_tx(autocommit=True, ignore_err=False):
'''
:param autocommit:
:param ignore_err:
:return:
'''
async def decorator(func):
    try:
        @functools.wraps(func)
        async def func_ex(*args, **kwargs):
            async with pool.acquire() as con:
                tx = con.transaction()
                await tx.start()
                logger.debug("tx %s started", tx)
                try:
                    return await func(con, *args, **kwargs)
                except Exception as e:
                    logger.exception(e)
                    await tx.rollback()
                    logger.info("tx %s rollbacked", tx)
                    raise_with_traceback(e)
                finally:
                    logger.debug("tx %s is released", tx)
                    await tx.commit()

        return await func_ex()
    except Exception as e:
            logger.exception(e)

return decorator

это работает, если функция быть украшенной не имеет параметров:

@asyncpg_tx()

async def load_latest_group_stats (con):

now = datetime.datetime.now()
slot = int(2 * (now.hour + now.minute / 60))  # range(0~47)

cur = await con.cursor('select * from table where $1=?', slot)
rows = await cur.fetch(5)
return rows

однако, это не работает, если у fucn есть params:

@asyncpg_tx()

async def save_group_stats (con, stats): sql = '...' result = await con.execute (sql, stats) print (результат)

сообщение об ошибке: TypeError: save_group_stats() отсутствует 1 обязательный позиционный аргумент: 'stats'

Вызывающий скрипт:

    result = asyncio.get_event_loop().run_until_complete(load_latest_group_stats)
print(result) # this will prints records, all ok here

stats = ... # assignment here


#async def save():
    #return await save_group_stats(stats.dict())

result = asyncio.get_event_loop().run_until_complete(save_group_stats)

если я раскомментирую #async def save и вызову run_until_complete(save()), все равно не получится, появится сообщение об ошибке:

  File "C:/workspace/parrots/dal/postgres/group_stats_dao.py", line 55, in save
return await save_group_stats(stats.dict())

TypeError: объект "сопрограммы" не может быть вызван sys:1: RuntimeWarning: сопрограмма "asyncpg_tx..decorator" никогда не ожидалась

Вопрос, как правильно реализовать обертку?

0 ответов

Другие вопросы по тегам