如何将此 plpythonu 存储过程插入到数据库中?
How do I make this plpythonu stored procedure insert to database?
我正在尝试使用 plpythonu 存储过程从标准输入中读取行,并将这些行中的数据插入到 PostgreSQL 数据库中。
当我调用 Python 3 下的过程时,它会运行(为读取的每一行消耗一个序列值),
但在数据库中不存储任何数据。
当我从 psql 调用相同的过程时,它工作正常,在 db.
中插入一行
例如:
操作:运行 SELECT sl_insert_day('2017-01-02', '05:15');
来自 psql 用户 jazcap53
结果:插入日期 day_id 1.
操作:运行 python3 src/load/load_mcv.py < input.txt
在命令行
结果:未插入任何内容,但消耗了 2 个序列 day_id。
操作:运行 SELECT sl_insert_day('2017-01-03', '06:15');
来自 psql 用户 jazcap53
结果:日期插入 day_id 4.
文件:input.txt:
DAY, 2017-01-05, 06:00
DAY, 2017-01-06, 07:00
输出:
('sl_insert_day() succeeded',)
('sl_insert_day() succeeded',)
我是 运行 Fedora 25、Python 3.6.0 和 PostgreSQL 9.5.6。
非常感谢任何可以帮助我的人!
下面是重现此行为的 MCV 示例。 我想我的问题出在第 8 步或第 6 步中 -- 为了完整起见,还包含了其他步骤。
用于创建 MCV 的步骤:
步骤 1) 创建数据库:
在 psql 作为用户 postgres,
创建数据库 sl_test_mcv;
步骤 2) 数据库初始化:
文件:db/database_mcv.ini
[postgresql]
host=localhost
database=sl_test_mcv
user=jazcap53
password=*****
步骤 3) 运行 数据库配置:
文件:db/config_mcv.py
from configparser import ConfigParser
def config(filename='db/database_mcv.ini', section='postgresql'):
parser = ConfigParser()
parser.read(filename)
db = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
else:
raise Exception('Section {} not found in the {} file'.format(section, filename))
return db
步骤 4) 创建 table:
文件:db/create_tables_mcv.sql
DROP TABLE IF EXISTS sl_day CASCADE;
CREATE TABLE sl_day (
day_id SERIAL UNIQUE,
start_date date NOT NULL,
start_time time NOT NULL,
PRIMARY KEY (day_id)
);
第 5 步)创建语言:
CREATE LANGUAGE plpythonu;
步骤 6) 创建程序:
文件:db/create_procedures_mcv.sql
DROP FUNCTION sl_insert_day(date, time without time zone);
CREATE FUNCTION sl_insert_day(new_start_date date,
new_start_time time without time zone) RETURNS text AS $$
from plpy import spiexceptions
try:
plan = plpy.prepare("INSERT INTO sl_day (start_date, start_time) \
VALUES(, )", ["date", "time without time zone"])
plpy.execute(plan, [new_start_date, new_start_time])
except plpy.SPIError, e:
return "error: SQLSTATE %s" % (e.sqlstate,)
else:
return "sl_insert_day() succeeded"
$$ LANGUAGE plpythonu;
步骤 7) 授予权限:
文件:db/grant_privileges_mcv.sql
GRANT SELECT, UPDATE, INSERT, DELETE ON sl_day TO jazcap53;
GRANT USAGE ON sl_day_day_id_seq TO jazcap53;
步骤 8) 运行 程序为 python3 src/load/load_mcv.py < input.txt:
文件:src/load/load_mcv.py
import sys
import psycopg2
from spreadsheet_etl.db.config_mcv import config
def conn_exec():
conn = None
try:
params = config()
conn = psycopg2.connect(**params)
cur = conn.cursor()
last_serial_val = 0
while True:
my_line = sys.stdin.readline()
if not my_line:
break
line_list = my_line.rstrip().split(', ')
if line_list[0] == 'DAY':
cur.execute('SELECT sl_insert_day(\'{}\', \'{}\')'.
format(line_list[1], line_list[2]))
print(cur.fetchone())
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
if __name__ == '__main__':
conn_exec()
在cur.close()
之后做conn.commit()
我正在尝试使用 plpythonu 存储过程从标准输入中读取行,并将这些行中的数据插入到 PostgreSQL 数据库中。
当我调用 Python 3 下的过程时,它会运行(为读取的每一行消耗一个序列值), 但在数据库中不存储任何数据。 当我从 psql 调用相同的过程时,它工作正常,在 db.
中插入一行例如:
操作:运行 SELECT sl_insert_day('2017-01-02', '05:15');
来自 psql 用户 jazcap53
结果:插入日期 day_id 1.
操作:运行 python3 src/load/load_mcv.py < input.txt
在命令行
结果:未插入任何内容,但消耗了 2 个序列 day_id。
操作:运行 SELECT sl_insert_day('2017-01-03', '06:15');
来自 psql 用户 jazcap53
结果:日期插入 day_id 4.
文件:input.txt:
DAY, 2017-01-05, 06:00
DAY, 2017-01-06, 07:00
输出:
('sl_insert_day() succeeded',)
('sl_insert_day() succeeded',)
我是 运行 Fedora 25、Python 3.6.0 和 PostgreSQL 9.5.6。
非常感谢任何可以帮助我的人!
下面是重现此行为的 MCV 示例。 我想我的问题出在第 8 步或第 6 步中 -- 为了完整起见,还包含了其他步骤。
用于创建 MCV 的步骤:
步骤 1) 创建数据库:
在 psql 作为用户 postgres,
创建数据库 sl_test_mcv;
步骤 2) 数据库初始化:
文件:db/database_mcv.ini
[postgresql]
host=localhost
database=sl_test_mcv
user=jazcap53
password=*****
步骤 3) 运行 数据库配置:
文件:db/config_mcv.py
from configparser import ConfigParser
def config(filename='db/database_mcv.ini', section='postgresql'):
parser = ConfigParser()
parser.read(filename)
db = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
else:
raise Exception('Section {} not found in the {} file'.format(section, filename))
return db
步骤 4) 创建 table:
文件:db/create_tables_mcv.sql
DROP TABLE IF EXISTS sl_day CASCADE;
CREATE TABLE sl_day (
day_id SERIAL UNIQUE,
start_date date NOT NULL,
start_time time NOT NULL,
PRIMARY KEY (day_id)
);
第 5 步)创建语言:
CREATE LANGUAGE plpythonu;
步骤 6) 创建程序:
文件:db/create_procedures_mcv.sql
DROP FUNCTION sl_insert_day(date, time without time zone);
CREATE FUNCTION sl_insert_day(new_start_date date,
new_start_time time without time zone) RETURNS text AS $$
from plpy import spiexceptions
try:
plan = plpy.prepare("INSERT INTO sl_day (start_date, start_time) \
VALUES(, )", ["date", "time without time zone"])
plpy.execute(plan, [new_start_date, new_start_time])
except plpy.SPIError, e:
return "error: SQLSTATE %s" % (e.sqlstate,)
else:
return "sl_insert_day() succeeded"
$$ LANGUAGE plpythonu;
步骤 7) 授予权限:
文件:db/grant_privileges_mcv.sql
GRANT SELECT, UPDATE, INSERT, DELETE ON sl_day TO jazcap53;
GRANT USAGE ON sl_day_day_id_seq TO jazcap53;
步骤 8) 运行 程序为 python3 src/load/load_mcv.py < input.txt:
文件:src/load/load_mcv.py
import sys
import psycopg2
from spreadsheet_etl.db.config_mcv import config
def conn_exec():
conn = None
try:
params = config()
conn = psycopg2.connect(**params)
cur = conn.cursor()
last_serial_val = 0
while True:
my_line = sys.stdin.readline()
if not my_line:
break
line_list = my_line.rstrip().split(', ')
if line_list[0] == 'DAY':
cur.execute('SELECT sl_insert_day(\'{}\', \'{}\')'.
format(line_list[1], line_list[2]))
print(cur.fetchone())
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
finally:
if conn is not None:
conn.close()
if __name__ == '__main__':
conn_exec()
在cur.close()
conn.commit()