PostgreSQL创建函数按间隔批量导出符号化数据
PostgreSQL create function to export symbolised data in batches by interval
我想将记录从数据库 table 导出到 CSV 文件,基于时间间隔批量导出。我通常会使用 python 并在 for 循环中为此类事物创建不同的查询,然后执行它们。例如,我通常会创建一个查询,例如:
COPY (SELECT 'log_time:' || logtime, 'firstname:' || firstname FROM tablename WHERE log_time >= 2016-01-01 00:00:00 AND log_time < 2016-01-01 23:59:59) TO '/tmp/data_2016-01-01.csv' DELIMITER ',' CSV
我会循环几天执行上面的查询并相应地编辑间隔,以便逐日导出记录。例如,for 循环中的下一个查询将是:
COPY (SELECT 'log_time:' || logtime, 'firstname:' || firstname FROM tablename WHERE log_time >= 2016-01-02 00:00:00 AND log_time < 2016-01-02 23:59:59) TO '/tmp/data_2016-01-02.csv' DELIMITER ',' CSV
只是为了澄清查询中声明 'log_time:' || logtime, 'first name:'
是对导出数据进行数据挖掘所必需的部分。
另请注意,文件名因包含相关日期而异(/tmp/data_2016-01-01.csv、/tmp/data_2016-01-02.csv 等)。
到目前为止我想出的函数查询是这样的:
CREATE OR REPLACE FUNCTION temporal_interval_export_for_mining(timestamp without time zone, timestamp without time zone, interval, text)
RETURNS void AS
$func$
DECLARE
starttime timestamp without time zone := ;
endtime timestamp without time zone := ;
interval_length interval := ;
tablename := ;
file_id = starttime
BEGIN
LOOP
PERFORM COPY (SELECT * FROM tablename WHERE log_time >= starttime AND log_time < starttime + interval) TO ‘/tmp/data_’ + file_id + ‘.csv’ DELIMITER ',' CSV;
starttime := starttime + interval;
file_id := starttime;
EXIT WHEN starttime > endtime;
END LOOP;
END
$func$ LANGUAGE plpgsql;
但这缺少字段的符号化,而是有一个 select *
。我需要一些方法来自动检索 table(不仅仅是上面列出的两个)中所有字段的 select,符号如 fieldname:fieldvalue
.
现在我对创建这些函数一无所知,但我想我理解了上面的内容,尽管可能有错误。
我愿意接受任何可以简化流程的方法(不仅仅是函数),这样我就不需要在我的 python 代码中循环遍历日期列表,而是可以执行通过数据库进行区间处理。
为了您的目的,您需要使用 dynamic SQL and SECURITY DEFINER
标志。语句 COPY
没有执行计划,然后禁止在其中使用任何变量 - dynamic SQL 是必需的。 COPY
访问 IO 需要超级用户权限,应该非常小心地使用 - 所以你需要 SECURITY DEFINER
标志(此函数的所有者(创建者)必须是具有超级用户权限的用户):
CREATE OR REPLACE FUNCTION temporal_interval_export_for_mining(starttime timestamp without time zone,
endtime timestamp without time zone,
interval_length interval,
tablename text)
RETURNS void AS
$func$
DECLARE
ctime timestamp without time zone = starttime;
dsql text;
expr text = '*';
BEGIN
-- expr := expr_list(columns_to_array(tablename));
WHILE ctime < endtime
LOOP
dsql := format(
$_$COPY (SELECT %s FROM %I WHERE log_time >= %L AND log_time < %L) TO %L DELIMITER ',' CSV$_$,
expr, tablename, ctime, ctime + interval_length,
'/tmp/data_' || to_char(ctime, 'YYYY-MM-DD') || '.csv');
RAISE NOTICE 'Executing query: %', dsql;
EXECUTE dsql;
ctime := ctime + interval_length;
END LOOP;
RETURN;
END
$func$ LANGUAGE plpgsql SECURITY DEFINER STRICT;
你可以用 SELECT
:
来调用这个函数
postgres=# select temporal_interval_export_for_mining(current_timestamp::timestamp without time zone, (current_timestamp + interval '10days')::timestamp without time zone, '1day'::interval, 'foo'::text);
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-17 07:51:48.189734' AND log_time < '2016-01-18 07:51:48.189734') TO '/tmp/data_2016-01-17.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-18 07:51:48.189734' AND log_time < '2016-01-19 07:51:48.189734') TO '/tmp/data_2016-01-18.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-19 07:51:48.189734' AND log_time < '2016-01-20 07:51:48.189734') TO '/tmp/data_2016-01-19.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-20 07:51:48.189734' AND log_time < '2016-01-21 07:51:48.189734') TO '/tmp/data_2016-01-20.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-21 07:51:48.189734' AND log_time < '2016-01-22 07:51:48.189734') TO '/tmp/data_2016-01-21.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-22 07:51:48.189734' AND log_time < '2016-01-23 07:51:48.189734') TO '/tmp/data_2016-01-22.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-23 07:51:48.189734' AND log_time < '2016-01-24 07:51:48.189734') TO '/tmp/data_2016-01-23.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-24 07:51:48.189734' AND log_time < '2016-01-25 07:51:48.189734') TO '/tmp/data_2016-01-24.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-25 07:51:48.189734' AND log_time < '2016-01-26 07:51:48.189734') TO '/tmp/data_2016-01-25.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-26 07:51:48.189734' AND log_time < '2016-01-27 07:51:48.189734') TO '/tmp/data_2016-01-26.csv' DELIMITER ',' CSV
temporal_interval_export_for_mining
-------------------------------------
(1 row)
如何生成名单?这取决于你的 Postgres 有多旧。我期望 9.1 或更高版本。
CREATE OR REPLACE FUNCTION public.expr_list(colnames text[])
RETURNS text
LANGUAGE plpgsql AS $function$
DECLARE
colname text;
result text;
expressions text[];
BEGIN
IF colnames IS NOT NULL THEN
expressions := '{}';
FOREACH colname IN ARRAY colnames
LOOP
expressions := expressions || format('%L || %I', colname || ':', colname);
END LOOP;
result := array_to_string(expressions, ', ');
ELSE
result := '*';
END IF;
RETURN result;
END;
$function$;
postgres=# select expr_list(ARRAY['name','surname']);
expr_list
----------------------------------------
'name:' || name, 'surname:' || surname
(1 row)
postgres=# select expr_list(ARRAY(SELECT column_name::text FROM information_schema.columns WHERE table_name = 'pg_class'));
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
'relname:' || relname, 'relnamespace:' || relnamespace, 'reltype:' || reltype, 'reloftype:' || reloftype, 'relowner:' || relowner, 'relam:' || relam, 'relfilenode:' || relfilenode, 'reltablespace:' || reltablespace, 'relpages:
(1 row)
可以通过自定义SQL函数简化调用:
CREATE OR REPLACE FUNCTION colums_to_array(text)
RETURNS text[] AS $$
SELECT ARRAY(SELECT column_name::text
FROM information_schema.columns
WHERE table_name = ::name)
$$ LANGUAGE sql;
postgres=# SELECT colums_to_array('foo');
colums_to_array
------------------
{log_time,xx,yy}
(1 行)
我想将记录从数据库 table 导出到 CSV 文件,基于时间间隔批量导出。我通常会使用 python 并在 for 循环中为此类事物创建不同的查询,然后执行它们。例如,我通常会创建一个查询,例如:
COPY (SELECT 'log_time:' || logtime, 'firstname:' || firstname FROM tablename WHERE log_time >= 2016-01-01 00:00:00 AND log_time < 2016-01-01 23:59:59) TO '/tmp/data_2016-01-01.csv' DELIMITER ',' CSV
我会循环几天执行上面的查询并相应地编辑间隔,以便逐日导出记录。例如,for 循环中的下一个查询将是:
COPY (SELECT 'log_time:' || logtime, 'firstname:' || firstname FROM tablename WHERE log_time >= 2016-01-02 00:00:00 AND log_time < 2016-01-02 23:59:59) TO '/tmp/data_2016-01-02.csv' DELIMITER ',' CSV
只是为了澄清查询中声明 'log_time:' || logtime, 'first name:'
是对导出数据进行数据挖掘所必需的部分。
另请注意,文件名因包含相关日期而异(/tmp/data_2016-01-01.csv、/tmp/data_2016-01-02.csv 等)。
到目前为止我想出的函数查询是这样的:
CREATE OR REPLACE FUNCTION temporal_interval_export_for_mining(timestamp without time zone, timestamp without time zone, interval, text)
RETURNS void AS
$func$
DECLARE
starttime timestamp without time zone := ;
endtime timestamp without time zone := ;
interval_length interval := ;
tablename := ;
file_id = starttime
BEGIN
LOOP
PERFORM COPY (SELECT * FROM tablename WHERE log_time >= starttime AND log_time < starttime + interval) TO ‘/tmp/data_’ + file_id + ‘.csv’ DELIMITER ',' CSV;
starttime := starttime + interval;
file_id := starttime;
EXIT WHEN starttime > endtime;
END LOOP;
END
$func$ LANGUAGE plpgsql;
但这缺少字段的符号化,而是有一个 select *
。我需要一些方法来自动检索 table(不仅仅是上面列出的两个)中所有字段的 select,符号如 fieldname:fieldvalue
.
现在我对创建这些函数一无所知,但我想我理解了上面的内容,尽管可能有错误。
我愿意接受任何可以简化流程的方法(不仅仅是函数),这样我就不需要在我的 python 代码中循环遍历日期列表,而是可以执行通过数据库进行区间处理。
为了您的目的,您需要使用 dynamic SQL and SECURITY DEFINER
标志。语句 COPY
没有执行计划,然后禁止在其中使用任何变量 - dynamic SQL 是必需的。 COPY
访问 IO 需要超级用户权限,应该非常小心地使用 - 所以你需要 SECURITY DEFINER
标志(此函数的所有者(创建者)必须是具有超级用户权限的用户):
CREATE OR REPLACE FUNCTION temporal_interval_export_for_mining(starttime timestamp without time zone,
endtime timestamp without time zone,
interval_length interval,
tablename text)
RETURNS void AS
$func$
DECLARE
ctime timestamp without time zone = starttime;
dsql text;
expr text = '*';
BEGIN
-- expr := expr_list(columns_to_array(tablename));
WHILE ctime < endtime
LOOP
dsql := format(
$_$COPY (SELECT %s FROM %I WHERE log_time >= %L AND log_time < %L) TO %L DELIMITER ',' CSV$_$,
expr, tablename, ctime, ctime + interval_length,
'/tmp/data_' || to_char(ctime, 'YYYY-MM-DD') || '.csv');
RAISE NOTICE 'Executing query: %', dsql;
EXECUTE dsql;
ctime := ctime + interval_length;
END LOOP;
RETURN;
END
$func$ LANGUAGE plpgsql SECURITY DEFINER STRICT;
你可以用 SELECT
:
postgres=# select temporal_interval_export_for_mining(current_timestamp::timestamp without time zone, (current_timestamp + interval '10days')::timestamp without time zone, '1day'::interval, 'foo'::text);
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-17 07:51:48.189734' AND log_time < '2016-01-18 07:51:48.189734') TO '/tmp/data_2016-01-17.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-18 07:51:48.189734' AND log_time < '2016-01-19 07:51:48.189734') TO '/tmp/data_2016-01-18.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-19 07:51:48.189734' AND log_time < '2016-01-20 07:51:48.189734') TO '/tmp/data_2016-01-19.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-20 07:51:48.189734' AND log_time < '2016-01-21 07:51:48.189734') TO '/tmp/data_2016-01-20.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-21 07:51:48.189734' AND log_time < '2016-01-22 07:51:48.189734') TO '/tmp/data_2016-01-21.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-22 07:51:48.189734' AND log_time < '2016-01-23 07:51:48.189734') TO '/tmp/data_2016-01-22.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-23 07:51:48.189734' AND log_time < '2016-01-24 07:51:48.189734') TO '/tmp/data_2016-01-23.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-24 07:51:48.189734' AND log_time < '2016-01-25 07:51:48.189734') TO '/tmp/data_2016-01-24.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-25 07:51:48.189734' AND log_time < '2016-01-26 07:51:48.189734') TO '/tmp/data_2016-01-25.csv' DELIMITER ',' CSV
NOTICE: Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-26 07:51:48.189734' AND log_time < '2016-01-27 07:51:48.189734') TO '/tmp/data_2016-01-26.csv' DELIMITER ',' CSV
temporal_interval_export_for_mining
-------------------------------------
(1 row)
如何生成名单?这取决于你的 Postgres 有多旧。我期望 9.1 或更高版本。
CREATE OR REPLACE FUNCTION public.expr_list(colnames text[])
RETURNS text
LANGUAGE plpgsql AS $function$
DECLARE
colname text;
result text;
expressions text[];
BEGIN
IF colnames IS NOT NULL THEN
expressions := '{}';
FOREACH colname IN ARRAY colnames
LOOP
expressions := expressions || format('%L || %I', colname || ':', colname);
END LOOP;
result := array_to_string(expressions, ', ');
ELSE
result := '*';
END IF;
RETURN result;
END;
$function$;
postgres=# select expr_list(ARRAY['name','surname']);
expr_list
----------------------------------------
'name:' || name, 'surname:' || surname
(1 row)
postgres=# select expr_list(ARRAY(SELECT column_name::text FROM information_schema.columns WHERE table_name = 'pg_class'));
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
'relname:' || relname, 'relnamespace:' || relnamespace, 'reltype:' || reltype, 'reloftype:' || reloftype, 'relowner:' || relowner, 'relam:' || relam, 'relfilenode:' || relfilenode, 'reltablespace:' || reltablespace, 'relpages:
(1 row)
可以通过自定义SQL函数简化调用:
CREATE OR REPLACE FUNCTION colums_to_array(text)
RETURNS text[] AS $$
SELECT ARRAY(SELECT column_name::text
FROM information_schema.columns
WHERE table_name = ::name)
$$ LANGUAGE sql;
postgres=# SELECT colums_to_array('foo');
colums_to_array
------------------
{log_time,xx,yy}
(1 行)