将 Postgres 查询翻译为与 Snowflake 兼容

Translation of Postgres Query to be Snowflake compatible

我正在尝试将此 pl/pgSQL 函数转换为适用于 Snowflake。不幸的是,我刚开始使用 Snowflake,无法正确转换它。 Snowflake 计划在年底前支持 pgSQL 查询,不幸的是,目前还没有..

这里有一个快速介绍,让您了解我的 table 存储的内容以及该功能的作用。 我有三个 table。您可以在这个问题的底部找到所有 table 和示例数据的 DDL 语句。

  1. 事件 - 这是来源table。
  2. 持续时间 - 这是目标 table。
  3. Properties - 这是一个 table 用于参考,每当调用函数时都会更新。

每天一次,我将数据导入事件 table。我们感兴趣的事件是 device_types 1 和 2 (entry/exit) 的事件。然后我 运行 我的函数,它计算具有相同 card_nr 的事件之间的正确持续时间。之后,我将这些持续时间导入我的持续时间 table 并更新属性 table.

事件示例如下:

下面是调用函数后的持续时间示例:

我需要涵盖的最重要的事情是:

  1. 我应该过滤制造商始终是 'XX' 因为有像 YY 或 ZZ 这样的制造商
  2. 我应该只计算属性 events.event_time >= durationLimitDate 的持续时间
  3. 我需要确保 event_id_arrival 和 event_id_departure 不在目标 table(持续时间)中以避免重复!
  4. 一旦我将计算插入持续时间 table,我必须更新属性中的 durationLimitDate。知道 durationLimitDate = (Max(event_time) - durationLimitDays))

函数

CREATE OR REPLACE FUNCTION calculateduration() RETURNS void AS $function$

WITH cte AS (SELECT e.id, e.card_nr, e.event_time, e.ticket_type, e.manufacturer, e.carpark_id, e.device_type,
ROW_NUMBER() OVER (ORDER BY e.card_nr, e.carpark_id, e.event_time, e.device_type) AS rn
FROM events e
LEFT JOIN durations d ON d.event_id_arrival = e.id OR d.event_id_departure = e.id
WHERE e.event_time >= (SELECT PROP_VALUE::timestamp FROM properties WHERE prop_key = 'DURATION.LIMIT.DATE')
AND e.device_type IN (1, 2)
AND event_type = 2
AND e.manufacturer LIKE 'XX%'
AND d.id IS NULL)

INSERT INTO durations (id, odb_created_at, event_id_arrival, event_id_departure,
event_time_arrival, event_time_departure,
card_nr, ticket_type, duration, manufacturer, carpark_id)

SELECT nextval('durations_id_seq'),
current_timestamp,
arrived_entry.id,
departed_entry.id,
arrived_entry.event_time,
departed_entry.event_time,
arrived_entry.card_nr,
arrived_entry.ticket_type,
date_part('epoch', departed_entry.event_time::timestamp - arrived_entry.event_time::timestamp),
arrived_entry.manufacturer,
arrived_entry.carpark_id
FROM (SELECT * FROM cte WHERE cte.device_type = 1) AS arrived_entry
INNER JOIN (SELECT * FROM cte WHERE cte.device_type = 2) AS departed_entry ON arrived_entry.card_nr = departed_entry.card_nr
AND arrived_entry.carpark_id = departed_entry.carpark_id
AND arrived_entry.rn + 1 = departed_entry.rn;


UPDATE properties
SET PROP_VALUE = (SELECT (MAX(event_time) - ((SELECT PROP_VALUE FROM properties WHERE prop_key = 'DURATION.LIMIT.DAYS') ||' day')::interval) FROM events WHERE event_time >= (SELECT PROP_VALUE::timestamp FROM properties WHERE prop_key = 'DURATION.LIMIT.DATE'))
WHERE PROP_KEY ='DURATION.LIMIT.DATE';

$function$

LANGUAGE sql;

DDL 脚本

-- events
        CREATE TABLE IF NOT EXISTS events (
        id bigint NOT NULL autoincrement start 1 increment 1 PRIMARY KEY,
        odb_created_at timestamp without time zone NOT NULL,
        event_time timestamp without time zone NOT NULL,
        device_type integer NOT NULL,
        event_type integer NOT NULL,
        ticket_type integer NOT NULL,
        card_nr character varying(100),
        count integer DEFAULT 1 NOT NULL,
        manufacturer character varying(200),
        carpark_id bigint
    ); 

     -- durations
    CREATE TABLE IF NOT EXISTS durations (
        id bigint NOT NULL autoincrement start 1 increment 1 PRIMARY KEY,
        odb_created_at timestamp without time zone NOT NULL,
        event_id_arrival bigint,
        event_id_departure bigint,
        event_time_arrival timestamp without time zone,
        event_time_departure timestamp without time zone,
        card_nr character varying(100),
        ticket_type integer,
        duration integer,
        manufacturer character varying(200),
        carpark_id bigint
    );

    --properties
    create or replace TABLE PROPERTIES (
        PROP_KEY VARCHAR(80) NOT NULL,
        PROP_VALUE VARCHAR(250),
        primary key (PROP_KEY)
    );

样本数据:

INSERT INTO properties (prop_key,prop_value) VALUES
     ('DURATION.LIMIT.DAYS','30'),
     ('DURATION.LIMIT.DATE','2021-08-01 00:00:00.00');

    INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160996, '2021-10-02 04:28:26.338', '2021-10-01 09:14:41.32', 1, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160790, '2021-10-02 04:28:26.248', '2021-10-01 09:31:10.94', 2, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188146489, '2021-10-02 04:26:55.069', '2021-10-01 10:03:01.57', 1, 2, 500, '01479804030429500089598', 1, 'XX', 1563);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188146069, '2021-10-02 04:26:54.852', '2021-10-01 11:49:58.45', 2, 2, 500, '01479804030429500089598', 1, 'XX', 1563);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188161161, '2021-10-02 04:28:26.372', '2021-10-01 18:44:33.62', 1, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160950, '2021-10-02 04:28:26.329', '2021-10-01 18:45:51.903', 2, 2, 11, '03998988030897300007782', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188161227, '2021-10-02 04:28:26.374', '2021-10-01 23:21:18.58', 1, 2, 11, '04139733030897300003136', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188160974, '2021-10-02 04:28:26.334', '2021-10-01 23:24:03.29', 2, 2, 11, '04139733030897300003136', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188239864, '2021-10-03 04:24:43.345', '2021-10-02 06:49:55.97', 1, 2, 11, '01719400030897300061410', 1, 'XX', 1852);
INSERT INTO public.events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id) VALUES(188239649, '2021-10-03 04:24:43.308', '2021-10-02 07:02:08.72', 2, 2, 11, '01719400030897300061410', 1, 'XX', 1852);

谢谢!

独特测试

      CREATE TABLE IF NOT EXISTS test (
        id bigint NOT NULL AUTOINCREMENT PRIMARY KEY -- Check the syntax!
        , odb_created_at timestamp without time zone NOT NULL
        , event_time timestamp without time zone NOT NULL
        , device_type integer NOT NULL
        , event_type integer NOT NULL
        , ticket_type integer NOT NULL
        , card_nr character varying(100)
        , count integer DEFAULT 1 NOT NULL
        , manufacturer character varying(200)
        , carpark_id bigint
        , UNIQUE (card_nr, event_time) -- NATURAL KEY
    );
   
   
   INSERT INTO EUILOGS_DEV.ILOGS.TEST (ODB_CREATED_AT, EVENT_TIME, DEVICE_TYPE, EVENT_TYPE, TICKET_TYPE, CARD_NR, COUNT, MANUFACTURER, CARPARK_ID) 
   VALUES(current_timestamp(), '2021-01-01 15:00:00.000', 1, 1, 1, 'CARD1', 1, 'MAN1', 1);
  INSERT INTO EUILOGS_DEV.ILOGS.TEST (ODB_CREATED_AT, EVENT_TIME, DEVICE_TYPE, EVENT_TYPE, TICKET_TYPE, CARD_NR, COUNT, MANUFACTURER, CARPARK_ID) 
   VALUES(current_timestamp(), '2021-01-01 15:00:00.000', 1, 1, 1, 'CARD1', 1, 'MAN1', 1);

您需要使用序列来生成自动编号: https://docs.snowflake.com/en/user-guide/querying-sequences.html

所以先创建一个序列来使用它:

create or replace sequence seq1;

然后使用下面的函数:

create or replace procedure calculateduration() 
RETURNS string
LANGUAGE JAVASCRIPT 
AS $$
var query1 = 
`
  INSERT INTO durations (id, odb_created_at, event_id_arrival, event_id_departure,
  event_time_arrival, event_time_departure,
  card_nr, ticket_type, duration, manufacturer, carpark_id)

  WITH cte AS (
      SELECT e.id, e.card_nr, e.event_time, e.ticket_type, e.manufacturer, e.carpark_id, e.device_type,
      ROW_NUMBER() OVER (ORDER BY e.card_nr, e.carpark_id, e.event_time, e.device_type) AS rn
      FROM events e
      LEFT JOIN durations d ON d.event_id_arrival = e.id OR d.event_id_departure = e.id
      WHERE e.event_time >= (SELECT PROP_VALUE::timestamp FROM properties WHERE prop_key = 'DURATION.LIMIT.DATE')
      AND e.device_type IN (1, 2)
      AND event_type = 2
      AND e.manufacturer LIKE 'XX%'
      AND d.id IS NULL
  )
  SELECT 
    seq1.nextval,
    current_timestamp(),
    arrived_entry.id,
    departed_entry.id,
    arrived_entry.event_time,
    departed_entry.event_time,
    arrived_entry.card_nr,
    arrived_entry.ticket_type,
    timestampdiff(second, arrived_entry.event_time, departed_entry.event_time),
    arrived_entry.manufacturer,
    arrived_entry.carpark_id
  FROM (SELECT * FROM cte WHERE cte.device_type = 1) AS arrived_entry
  INNER JOIN (SELECT * FROM cte WHERE cte.device_type = 2) AS departed_entry 
    ON arrived_entry.card_nr = departed_entry.card_nr
  AND arrived_entry.carpark_id = departed_entry.carpark_id
  AND arrived_entry.rn + 1 = departed_entry.rn
`;

snowflake.execute({ sqlText: query1 });

var query2 = "SELECT PROP_VALUE FROM properties WHERE prop_key = 'DURATION.LIMIT.DAYS'";
var stmt = snowflake.createStatement({ sqlText: query2 });
var resultSet = stmt.execute(); 
resultSet.next();
var prop_value = resultSet.getColumnValue(1); 

var query3 =
`
  UPDATE properties
  SET PROP_VALUE = (
    SELECT dateadd(day, -1 * ${prop_value}, MAX(event_time)) FROM events 
    WHERE event_time >= (
      SELECT PROP_VALUE::timestamp FROM properties WHERE prop_key = 'DURATION.LIMIT.DATE'
    )
  )
  WHERE PROP_KEY ='DURATION.LIMIT.DATE';
`

stmt = snowflake.createStatement({ sqlText: query3 });
stmt.execute();

return 'true';
$$;

然后调用程序:

call calculateduration();

代码很简单,没有太多验证和检查,但它应该可以完成您需要的工作。

(这是一个普通的 sql 重写)

  • 不应分配 serials/IDENTITY(除了填充初始 table)
  • 为避免插入重复项,您需要一个自然键
  • (短 table 别名更容易阅读)

\i tmp.sql

-- DDL SCRIPTS

-- events
        CREATE TABLE IF NOT EXISTS events (
        id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY (start 1 increment 1) PRIMARY KEY -- Check the syntax!
        , odb_created_at timestamp without time zone NOT NULL
        , event_time timestamp without time zone NOT NULL
        , device_type integer NOT NULL
        , event_type integer NOT NULL
        , ticket_type integer NOT NULL
        , card_nr character varying(100)
        , count integer DEFAULT 1 NOT NULL
        , manufacturer character varying(200)
        , carpark_id bigint
        , UNIQUE (card_nr, event_time) -- NATURAL KEY
    );

     -- durations
    CREATE TABLE IF NOT EXISTS durations (
        id bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY (start 1 increment 1) PRIMARY KEY -- Check the syntax!
        , odb_created_at timestamp without time zone NOT NULL
        , event_id_arrival bigint
        , event_id_departure bigint
        , event_time_arrival timestamp without time zone
        , event_time_departure timestamp without time zone
        , card_nr character varying(100)
        , ticket_type integer
        , duration integer
        , manufacturer character varying(200)
        , carpark_id bigint
        , UNIQUE (card_nr,event_id_arrival) -- NATURAL KEY
        , UNIQUE (card_nr,event_id_departure) -- (alternate) NATURAL KEY
    );

    --properties
    DROP TABLE IF EXISTS properties ;
    CREATE TABLE IF NOT EXISTS properties (
        prop_key VARCHAR(80) NOT NULL
        , prop_value VARCHAR(250)
        , primary key (prop_key)
    );
-- Sample data:

INSERT INTO properties (prop_key,prop_value) VALUES
     ('DURATION.LIMIT.DAYS','30'),
     ('DURATION.LIMIT.DATE','2021-08-01 00:00:00.00');

    INSERT INTO events (id, odb_created_at, event_time, device_type, event_type, ticket_type, card_nr, count, manufacturer, carpark_id)
    VALUES(188160996, '2021-10-02 04:28:26.338', '2021-10-01 09:14:41.32', 1, 2, 11, '03998988030897300007782', 1, 'XX', 1852)
        , (188160790, '2021-10-02 04:28:26.248', '2021-10-01 09:31:10.94', 2, 2, 11, '03998988030897300007782', 1, 'XX', 1852)
        , (188146489, '2021-10-02 04:26:55.069', '2021-10-01 10:03:01.57', 1, 2, 500, '01479804030429500089598', 1, 'XX', 1563)
        , (188146069, '2021-10-02 04:26:54.852', '2021-10-01 11:49:58.45', 2, 2, 500, '01479804030429500089598', 1, 'XX', 1563)
        , (188161161, '2021-10-02 04:28:26.372', '2021-10-01 18:44:33.62', 1, 2, 11, '03998988030897300007782', 1, 'XX', 1852)
        , (188160950, '2021-10-02 04:28:26.329', '2021-10-01 18:45:51.903', 2, 2, 11, '03998988030897300007782', 1, 'XX', 1852)
        , (188161227, '2021-10-02 04:28:26.374', '2021-10-01 23:21:18.58', 1, 2, 11, '04139733030897300003136', 1, 'XX', 1852)
        , (188160974, '2021-10-02 04:28:26.334', '2021-10-01 23:24:03.29', 2, 2, 11, '04139733030897300003136', 1, 'XX', 1852)
        , (188239864, '2021-10-03 04:24:43.345', '2021-10-02 06:49:55.97', 1, 2, 11, '01719400030897300061410', 1, 'XX', 1852)
        , (188239649, '2021-10-03 04:24:43.308', '2021-10-02 07:02:08.72', 2, 2, 11, '01719400030897300061410', 1, 'XX', 1852)
        ;
        -- Rewritten funtion body
-- EXPLAIN ANALYZE
WITH omg AS (
        SELECT e.id, e.card_nr, e.event_time, e.ticket_type, e.manufacturer, e.carpark_id, e.device_type
        , row_number() OVER (ORDER BY e.card_nr, e.carpark_id, e.event_time, e.device_type) AS rn
        FROM events e
        WHERE e.event_time >= (SELECT prop_value::timestamp FROM properties WHERE prop_key = 'DURATION.LIMIT.DATE')
        AND e.device_type IN (1, 2)
        AND e.event_type = 2
        AND e.manufacturer LIKE 'XX%'
        AND NOT EXISTS (
                SELECT *
                FROM durations d WHERE d.event_id_arrival = e.id OR d.event_id_departure = e.id
                )
        )

INSERT INTO durations (odb_created_at
        , event_id_arrival, event_id_departure
        , event_time_arrival, event_time_departure
        , card_nr, ticket_type
        , duration, manufacturer, carpark_id)

SELECT -- nextval('durations_id_seq'),
        current_timestamp
        , t0.id, t1.id
        , t0.event_time, t1.event_time
        , t0.card_nr
        , t0.ticket_type
        , date_part('epoch', t1.event_time::timestamp - t0.event_time::timestamp)
        , t0.manufacturer
        , t0.carpark_id
FROM (SELECT * FROM omg WHERE device_type = 1) AS t0
JOIN (SELECT * FROM omg WHERE device_type = 2) AS t1
        ON t0.card_nr = t1.card_nr
        AND t0.carpark_id = t1.carpark_id
        AND t0.rn + 1 = t1.rn -- this is questionable
        -- Avoid duplicate insertions
WHERE NOT EXISTS (
        SELECT * FROM durations nx
        WHERE nx.card_nr = t0.card_nr
        AND nx.event_time_arrival = t0.event_time
        )
        ;
        -- This is beyond repair, IMHO
UPDATE properties
SET prop_value = (
        SELECT (MAX(event_time) - ((SELECT prop_value FROM properties
        WHERE prop_key = 'DURATION.LIMIT.DAYS') ||' day')::interval)
        FROM events WHERE event_time >= (SELECT prop_value::timestamp FROM properties
        WHERE prop_key = 'DURATION.LIMIT.DATE')
        )
WHERE prop_key ='DURATION.LIMIT.DATE';

select * from durations;
select * from properties;