Как мне сделать эту хранимую процедуру plpythonu вставкой в ​​базу данных?

Я пытаюсь прочитать строки из stdin и вставить данные из этих строк в базу данных PostgreSQL, используя хранимую процедуру plpythonu.

Когда я вызываю процедуру в Python 3, она запускается (использует последовательное значение для каждой прочитанной строки), но не сохраняет никаких данных в БД. Когда я вызываю ту же процедуру из psql, она работает нормально, вставляя одну строку в БД.

Например:

Действие: запустить SELECT sl_insert_day('2017-01-02', '05:15'); из psql как пользователь jazcap53
Результат: день вставлен с day_id 1.

Действие: запустить python3 src/load/load_mcv.py < input.txt в командной строке
Результат: ничего не вставлено, но используются 2 последовательных идентификатора дня.

Действие: запустить SELECT sl_insert_day('2017-01-03', '06:15'); из psql как пользователь jazcap53
Результат: день вставлен с day_id 4.

файл: input.txt:

DAY, 2017-01-05, 06:00
DAY, 2017-01-06, 07:00

Выход:

('sl_insert_day() succeeded',)  
('sl_insert_day() succeeded',)  

Я использую Fedora 25, Python 3.6.0 и PostgreSQL 9.5.6.

Большое спасибо всем, кто может мне помочь с этим!


Ниже приведен пример MCV, который воспроизводит это поведение. Я ожидаю, что моя проблема в Шаге 8 или Шаге 6 - другие Шаги включены для полноты.

Шаги, используемые для создания MCV:

Шаг 1) Создайте базу данных:

В PSQL как пользователь postgres,
CREATE DATABASE sl_test_mcv;

Шаг 2) База данных init:

файл: db/database_mcv.ini

[postgresql]
host=localhost
database=sl_test_mcv
user=jazcap53
password=*****

Шаг 3) Запустите настройку базы данных:

файл: db/config_mcv.py

from configparser import ConfigParser

def config(filename='db/database_mcv.ini', section='postgresql'):
    parser = ConfigParser()
    parser.read(filename)
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception('Section {} not found in the {} file'.format(section, filename))
    return db

Шаг 4) Создать таблицу:

файл: db/create_tables_mcv.sql

DROP TABLE IF EXISTS sl_day CASCADE;

CREATE TABLE sl_day (
    day_id SERIAL UNIQUE,
    start_date date NOT NULL,
    start_time time NOT NULL,
    PRIMARY KEY (day_id)
);

Шаг 5) Создайте язык:

CREATE LANGUAGE plpythonu;

Шаг 6) Создайте процедуру:

файл: db/create_procedures_mcv.sql

DROP FUNCTION sl_insert_day(date, time without time zone);

CREATE FUNCTION sl_insert_day(new_start_date date, 
    new_start_time time without time zone) RETURNS text AS $$
from plpy import spiexceptions
try:
    plan = plpy.prepare("INSERT INTO sl_day (start_date, start_time) \
            VALUES($1, $2)", ["date", "time without time zone"])
    plpy.execute(plan, [new_start_date, new_start_time])
except plpy.SPIError, e:
    return "error: SQLSTATE %s" % (e.sqlstate,)
else:
    return "sl_insert_day() succeeded"
$$ LANGUAGE plpythonu;

Шаг 7) Предоставьте привилегии:

файл: db/grant_privileges_mcv.sql

GRANT SELECT, UPDATE, INSERT, DELETE ON sl_day TO jazcap53;  
GRANT USAGE ON sl_day_day_id_seq TO jazcap53;

Шаг 8) Запустите процедуру как python3 src/load/load_mcv.py

файл: src/load/load_mcv.py

import sys 
import psycopg2
from spreadsheet_etl.db.config_mcv import config

def conn_exec():
    conn = None
    try:
        params = config()
        conn = psycopg2.connect(**params)
        cur = conn.cursor()
        last_serial_val = 0 
        while True:
            my_line = sys.stdin.readline()
            if not my_line:
                break
            line_list = my_line.rstrip().split(', ')
            if line_list[0] == 'DAY':
                cur.execute('SELECT sl_insert_day(\'{}\', \'{}\')'.
                            format(line_list[1], line_list[2]))
                print(cur.fetchone())
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

if __name__ == '__main__':
    conn_exec()

1 ответ

Решение

Делать conn.commit() после cur.close()

Другие вопросы по тегам