PostgreSQL创建函数按间隔批量导出符号化数据

PostgreSQL create function to export symbolised data in batches by interval

我想将记录从数据库 table 导出到 CSV 文件,基于时间间隔批量导出。我通常会使用 python 并在 for 循环中为此类事物创建不同的查询,然后执行它们。例如,我通常会创建一个查询,例如:

COPY (SELECT 'log_time:' || logtime, 'firstname:' || firstname FROM tablename WHERE log_time >= 2016-01-01 00:00:00 AND log_time < 2016-01-01 23:59:59) TO '/tmp/data_2016-01-01.csv' DELIMITER ',' CSV

我会循环几天执行上面的查询并相应地编辑间隔,以便逐日导出记录。例如,for 循环中的下一个查询将是:

COPY (SELECT 'log_time:' || logtime, 'firstname:' || firstname FROM tablename WHERE log_time >= 2016-01-02 00:00:00 AND log_time < 2016-01-02 23:59:59) TO '/tmp/data_2016-01-02.csv' DELIMITER ',' CSV

只是为了澄清查询中声明 'log_time:' || logtime, 'first name:' 是对导出数据进行数据挖掘所必需的部分。

另请注意,文件名因包含相关日期而异(/tmp/data_2016-01-01.csv、/tmp/data_2016-01-02.csv 等)。

到目前为止我想出的函数查询是这样的:

CREATE OR REPLACE FUNCTION temporal_interval_export_for_mining(timestamp without time zone, timestamp without time zone, interval, text) 
 RETURNS void AS 
$func$
DECLARE
    starttime timestamp without time zone := ;
    endtime timestamp without time zone := ;
    interval_length interval := ;
    tablename := ;
    file_id = starttime
BEGIN

LOOP
    PERFORM COPY (SELECT * FROM tablename WHERE log_time >= starttime AND log_time < starttime + interval) TO ‘/tmp/data_’ + file_id + ‘.csv’ DELIMITER ',' CSV;
    starttime := starttime + interval;
    file_id   := starttime;
    EXIT WHEN starttime > endtime;
END LOOP;

END
$func$ LANGUAGE plpgsql;

但这缺少字段的符号化,而是有一个 select *。我需要一些方法来自动检索 table(不仅仅是上面列出的两个)中所有字段的 select,符号如 fieldname:fieldvalue.

现在我对创建这些函数一无所知,但我想我理解了上面的内容,尽管可能有错误。

我愿意接受任何可以简化流程的方法(不仅仅是函数),这样我就不需要在我的 python 代码中循环遍历日期列表,而是可以执行通过数据库进行区间处理。

为了您的目的,您需要使用 dynamic SQL and SECURITY DEFINER 标志。语句 COPY 没有执行计划,然后禁止在其中使用任何变量 - dynamic SQL 是必需的。 COPY 访问 IO 需要超级用户权限,应该非常小心地使用 - 所以你需要 SECURITY DEFINER 标志(此函数的所有者(创建者)必须是具有超级用户权限的用户):

CREATE OR REPLACE FUNCTION temporal_interval_export_for_mining(starttime timestamp without time zone,
                                                               endtime timestamp without time zone,
                                                               interval_length interval,
                                                               tablename text) 
RETURNS void AS 
$func$
DECLARE
  ctime timestamp without time zone = starttime;
  dsql text;
  expr text = '*';
BEGIN
  -- expr := expr_list(columns_to_array(tablename));
  WHILE ctime < endtime
  LOOP
    dsql := format(
$_$COPY (SELECT %s FROM %I WHERE log_time >= %L AND log_time < %L) TO %L DELIMITER ',' CSV$_$,
                   expr, tablename, ctime, ctime + interval_length,
                   '/tmp/data_' || to_char(ctime, 'YYYY-MM-DD') || '.csv');
    RAISE NOTICE 'Executing query: %', dsql;
    EXECUTE dsql;
    ctime := ctime + interval_length;
  END LOOP; 
  RETURN;
END
$func$ LANGUAGE plpgsql SECURITY DEFINER STRICT;

你可以用 SELECT:

来调用这个函数
postgres=# select temporal_interval_export_for_mining(current_timestamp::timestamp without time zone, (current_timestamp + interval '10days')::timestamp without time zone, '1day'::interval, 'foo'::text);
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-17 07:51:48.189734' AND log_time < '2016-01-18 07:51:48.189734') TO '/tmp/data_2016-01-17.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-18 07:51:48.189734' AND log_time < '2016-01-19 07:51:48.189734') TO '/tmp/data_2016-01-18.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-19 07:51:48.189734' AND log_time < '2016-01-20 07:51:48.189734') TO '/tmp/data_2016-01-19.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-20 07:51:48.189734' AND log_time < '2016-01-21 07:51:48.189734') TO '/tmp/data_2016-01-20.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-21 07:51:48.189734' AND log_time < '2016-01-22 07:51:48.189734') TO '/tmp/data_2016-01-21.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-22 07:51:48.189734' AND log_time < '2016-01-23 07:51:48.189734') TO '/tmp/data_2016-01-22.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-23 07:51:48.189734' AND log_time < '2016-01-24 07:51:48.189734') TO '/tmp/data_2016-01-23.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-24 07:51:48.189734' AND log_time < '2016-01-25 07:51:48.189734') TO '/tmp/data_2016-01-24.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-25 07:51:48.189734' AND log_time < '2016-01-26 07:51:48.189734') TO '/tmp/data_2016-01-25.csv' DELIMITER ',' CSV
NOTICE:  Executing query: COPY (SELECT * FROM foo WHERE log_time >= '2016-01-26 07:51:48.189734' AND log_time < '2016-01-27 07:51:48.189734') TO '/tmp/data_2016-01-26.csv' DELIMITER ',' CSV
 temporal_interval_export_for_mining 
-------------------------------------

(1 row)

如何生成名单?这取决于你的 Postgres 有多旧。我期望 9.1 或更高版本。

CREATE OR REPLACE FUNCTION public.expr_list(colnames text[])
RETURNS text
LANGUAGE plpgsql AS $function$
DECLARE
  colname text;
  result text;
  expressions text[];
BEGIN
   IF colnames IS NOT NULL THEN
      expressions := '{}';
      FOREACH colname IN ARRAY colnames
      LOOP
        expressions := expressions || format('%L || %I', colname || ':', colname); 
      END LOOP;
      result := array_to_string(expressions, ', ');
    ELSE
      result := '*';
    END IF;
    RETURN result;
  END;
  $function$;

 postgres=# select expr_list(ARRAY['name','surname']);
               expr_list                
----------------------------------------
 'name:' || name, 'surname:' || surname
(1 row)


postgres=# select expr_list(ARRAY(SELECT column_name::text FROM information_schema.columns WHERE table_name = 'pg_class'));

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 'relname:' || relname, 'relnamespace:' || relnamespace, 'reltype:' || reltype, 'reloftype:' || reloftype, 'relowner:' || relowner, 'relam:' || relam, 'relfilenode:' || relfilenode, 'reltablespace:' || reltablespace, 'relpages:
(1 row)

可以通过自定义SQL函数简化调用:

CREATE OR REPLACE FUNCTION colums_to_array(text)
RETURNS text[] AS $$
    SELECT ARRAY(SELECT column_name::text
                    FROM information_schema.columns
                   WHERE table_name = ::name)
$$ LANGUAGE sql;

postgres=# SELECT colums_to_array('foo');
 colums_to_array  
------------------
 {log_time,xx,yy}

(1 行)