如何将复杂类型数组的成员与 SELECT 进行比较?

How to Compare Member of Array of Complex Type to a SELECT?

给出这样的 table:

╔══════════════════════════════════════╤════════╤═══════════╗
║ message_id                           │ type   │ json_data ║
╠══════════════════════════════════════╪════════╪═══════════╣
║ 2c8c86b7-4867-494a-88bf-1e6b17dd121f │ type-a │ {}        ║
╟──────────────────────────────────────┼────────┼───────────╢
║ 767d6fbf-84cf-4baa-9e33-46b15f0c8594 │ type-b │ {}        ║
╟──────────────────────────────────────┼────────┼───────────╢
║ 298dcedb-b51d-4623-89f8-fabec44663c8 │ type-c │ {}        ║
╚══════════════════════════════════════╧════════╧═══════════╝

和如下类型:

CREATE TYPE public.new_stream_message AS (
  message_id    UUID,
  "type"        VARCHAR(128),
  json_data     VARCHAR,
  json_metadata VARCHAR
);

和一个将这些数组作为参数之一的函数。

在这个函数中,我将这个数组写成 table。如果存在唯一约束违规,我想将数组中的 message_id 与 table 中的 select 语句进行比较。如果一切都匹配,那么这是一次幂等写入,我可以安全地忽略异常。我该怎么做?

怎么样

CREATE OR REPLACE FUNCTION idempotentAppend(streamId varchar, new_stream_message NewMessage[]) RETURNS AppendResult AS $$
DECLARE
    ...
BEGIN

    ...

    foreach message in ARRAY newMessages
    loop
        BEGIN
          INSERT INTO public.Messages (message_id, Type, JsonData, json_metadata)
          SELECT message.*;
        EXCEPTION WHEN unique_violation THEN
            SELECT * FROM public.Messages into conflict WHERE public.Messages.Id = Id;
            IF message.type != conflict.type OR message.json_data != conflict.json_data THEN
                RAISE unique_violation USING MESSAGE = 'Duplicate message ID: ' || Id;
            END IF;
        END;
    end loop;

    ...

END;
$$ LANGUAGE plpgsql;

顺便说一句,我在 ddd-cqrs-es slack 的#sql-stream-store 频道中找到了这个,并且一直在慢慢地开发 java 版本。

编辑:Whelp...看来我应该阅读更多的 slack 频道。看来你已经解决了

我最终执行了以下操作:使用 CURSOR,迭代返回的行并将它们追加到循环中的数组。然后,检查 a) _new_stream_messages 的长度是否等于读取返回的长度,然后 b) 两者的内容是否相同。我确信有更好的方法可以做到这一点,但我还没有找到它。

代码如下:

CREATE OR REPLACE FUNCTION __schema__.enforce_idempotent_append(
  _stream_id           CHAR(42),
  _start               INT,
  _check_length        BOOLEAN,
  _new_stream_messages __schema__.new_stream_message [])
  RETURNS VOID AS $F$
DECLARE
  _message_id_record RECORD;
  _message_id_cursor REFCURSOR;
  _message_ids       UUID [] = '{}' :: UUID [];
BEGIN
  _message_id_cursor = (
    SELECT *
    FROM __schema__.read(_stream_id, cardinality(_new_stream_messages), _start, true, false)
    OFFSET 1
  );

  FETCH FROM _message_id_cursor
  INTO _message_id_record;

  WHILE FOUND LOOP
    _message_ids = array_append(_message_ids, _message_id_record.message_id);

    FETCH FROM _message_id_cursor
    INTO _message_id_record;
  END LOOP;

  IF (_check_length AND cardinality(_new_stream_messages) > cardinality(_message_ids))
  THEN
    RAISE EXCEPTION 'WrongExpectedVersion'
    USING HINT = 'Wrong message count';
  END IF;

  IF _message_ids <> (
    SELECT ARRAY(
        SELECT n.message_id
        FROM unnest(_new_stream_messages) n
    ))
  THEN
    RAISE EXCEPTION 'WrongExpectedVersion';
  END IF;
END;
$F$
LANGUAGE 'plpgsql'