如何将复杂类型数组的成员与 SELECT 进行比较?
How to Compare Member of Array of Complex Type to a SELECT?
给出这样的 table:
╔══════════════════════════════════════╤════════╤═══════════╗
║ message_id │ type │ json_data ║
╠══════════════════════════════════════╪════════╪═══════════╣
║ 2c8c86b7-4867-494a-88bf-1e6b17dd121f │ type-a │ {} ║
╟──────────────────────────────────────┼────────┼───────────╢
║ 767d6fbf-84cf-4baa-9e33-46b15f0c8594 │ type-b │ {} ║
╟──────────────────────────────────────┼────────┼───────────╢
║ 298dcedb-b51d-4623-89f8-fabec44663c8 │ type-c │ {} ║
╚══════════════════════════════════════╧════════╧═══════════╝
和如下类型:
CREATE TYPE public.new_stream_message AS (
message_id UUID,
"type" VARCHAR(128),
json_data VARCHAR,
json_metadata VARCHAR
);
和一个将这些数组作为参数之一的函数。
在这个函数中,我将这个数组写成 table。如果存在唯一约束违规,我想将数组中的 message_id
与 table 中的 select 语句进行比较。如果一切都匹配,那么这是一次幂等写入,我可以安全地忽略异常。我该怎么做?
怎么样
CREATE OR REPLACE FUNCTION idempotentAppend(streamId varchar, new_stream_message NewMessage[]) RETURNS AppendResult AS $$
DECLARE
...
BEGIN
...
foreach message in ARRAY newMessages
loop
BEGIN
INSERT INTO public.Messages (message_id, Type, JsonData, json_metadata)
SELECT message.*;
EXCEPTION WHEN unique_violation THEN
SELECT * FROM public.Messages into conflict WHERE public.Messages.Id = Id;
IF message.type != conflict.type OR message.json_data != conflict.json_data THEN
RAISE unique_violation USING MESSAGE = 'Duplicate message ID: ' || Id;
END IF;
END;
end loop;
...
END;
$$ LANGUAGE plpgsql;
顺便说一句,我在 ddd-cqrs-es slack 的#sql-stream-store 频道中找到了这个,并且一直在慢慢地开发 java 版本。
编辑:Whelp...看来我应该阅读更多的 slack 频道。看来你已经解决了
我最终执行了以下操作:使用 CURSOR,迭代返回的行并将它们追加到循环中的数组。然后,检查 a) _new_stream_messages 的长度是否等于读取返回的长度,然后 b) 两者的内容是否相同。我确信有更好的方法可以做到这一点,但我还没有找到它。
代码如下:
CREATE OR REPLACE FUNCTION __schema__.enforce_idempotent_append(
_stream_id CHAR(42),
_start INT,
_check_length BOOLEAN,
_new_stream_messages __schema__.new_stream_message [])
RETURNS VOID AS $F$
DECLARE
_message_id_record RECORD;
_message_id_cursor REFCURSOR;
_message_ids UUID [] = '{}' :: UUID [];
BEGIN
_message_id_cursor = (
SELECT *
FROM __schema__.read(_stream_id, cardinality(_new_stream_messages), _start, true, false)
OFFSET 1
);
FETCH FROM _message_id_cursor
INTO _message_id_record;
WHILE FOUND LOOP
_message_ids = array_append(_message_ids, _message_id_record.message_id);
FETCH FROM _message_id_cursor
INTO _message_id_record;
END LOOP;
IF (_check_length AND cardinality(_new_stream_messages) > cardinality(_message_ids))
THEN
RAISE EXCEPTION 'WrongExpectedVersion'
USING HINT = 'Wrong message count';
END IF;
IF _message_ids <> (
SELECT ARRAY(
SELECT n.message_id
FROM unnest(_new_stream_messages) n
))
THEN
RAISE EXCEPTION 'WrongExpectedVersion';
END IF;
END;
$F$
LANGUAGE 'plpgsql'
给出这样的 table:
╔══════════════════════════════════════╤════════╤═══════════╗
║ message_id │ type │ json_data ║
╠══════════════════════════════════════╪════════╪═══════════╣
║ 2c8c86b7-4867-494a-88bf-1e6b17dd121f │ type-a │ {} ║
╟──────────────────────────────────────┼────────┼───────────╢
║ 767d6fbf-84cf-4baa-9e33-46b15f0c8594 │ type-b │ {} ║
╟──────────────────────────────────────┼────────┼───────────╢
║ 298dcedb-b51d-4623-89f8-fabec44663c8 │ type-c │ {} ║
╚══════════════════════════════════════╧════════╧═══════════╝
和如下类型:
CREATE TYPE public.new_stream_message AS (
message_id UUID,
"type" VARCHAR(128),
json_data VARCHAR,
json_metadata VARCHAR
);
和一个将这些数组作为参数之一的函数。
在这个函数中,我将这个数组写成 table。如果存在唯一约束违规,我想将数组中的 message_id
与 table 中的 select 语句进行比较。如果一切都匹配,那么这是一次幂等写入,我可以安全地忽略异常。我该怎么做?
怎么样
CREATE OR REPLACE FUNCTION idempotentAppend(streamId varchar, new_stream_message NewMessage[]) RETURNS AppendResult AS $$
DECLARE
...
BEGIN
...
foreach message in ARRAY newMessages
loop
BEGIN
INSERT INTO public.Messages (message_id, Type, JsonData, json_metadata)
SELECT message.*;
EXCEPTION WHEN unique_violation THEN
SELECT * FROM public.Messages into conflict WHERE public.Messages.Id = Id;
IF message.type != conflict.type OR message.json_data != conflict.json_data THEN
RAISE unique_violation USING MESSAGE = 'Duplicate message ID: ' || Id;
END IF;
END;
end loop;
...
END;
$$ LANGUAGE plpgsql;
顺便说一句,我在 ddd-cqrs-es slack 的#sql-stream-store 频道中找到了这个,并且一直在慢慢地开发 java 版本。
编辑:Whelp...看来我应该阅读更多的 slack 频道。看来你已经解决了
我最终执行了以下操作:使用 CURSOR,迭代返回的行并将它们追加到循环中的数组。然后,检查 a) _new_stream_messages 的长度是否等于读取返回的长度,然后 b) 两者的内容是否相同。我确信有更好的方法可以做到这一点,但我还没有找到它。
代码如下:
CREATE OR REPLACE FUNCTION __schema__.enforce_idempotent_append(
_stream_id CHAR(42),
_start INT,
_check_length BOOLEAN,
_new_stream_messages __schema__.new_stream_message [])
RETURNS VOID AS $F$
DECLARE
_message_id_record RECORD;
_message_id_cursor REFCURSOR;
_message_ids UUID [] = '{}' :: UUID [];
BEGIN
_message_id_cursor = (
SELECT *
FROM __schema__.read(_stream_id, cardinality(_new_stream_messages), _start, true, false)
OFFSET 1
);
FETCH FROM _message_id_cursor
INTO _message_id_record;
WHILE FOUND LOOP
_message_ids = array_append(_message_ids, _message_id_record.message_id);
FETCH FROM _message_id_cursor
INTO _message_id_record;
END LOOP;
IF (_check_length AND cardinality(_new_stream_messages) > cardinality(_message_ids))
THEN
RAISE EXCEPTION 'WrongExpectedVersion'
USING HINT = 'Wrong message count';
END IF;
IF _message_ids <> (
SELECT ARRAY(
SELECT n.message_id
FROM unnest(_new_stream_messages) n
))
THEN
RAISE EXCEPTION 'WrongExpectedVersion';
END IF;
END;
$F$
LANGUAGE 'plpgsql'