让 postgres 做一个阻塞等待
Making postgres do a blocking wait
我认为创建一个可以从 plpgsql 函数中使用的阻塞 wait() 可能很有趣(并且很有用)。我让它工作了,但我不确定它是否构思良好。使用 datagrip 时会出现一个有趣的问题。如果我从函数中调用 pg_wait():
select util.wait_test_func() into var --this calls my pg_wait()
该函数将正确阻塞,直到我在不同的查询控制台中发出 NOTIFY window。事实上,如果函数 wait_test_function() 调用我的 pg_wait() 三次,它将正确阻塞三次,我将不得不调用 NOTIFY 三次以允许函数完成。但是,一旦函数完成,如果我再次调用它,它会立即 returns,不会阻塞。几乎就像 NOTIFY 仍在队列中一样,但我并不真正相信这就是问题所在。如果我关闭 datagrip 查询控制台,打开一个新控制台,然后重新发出函数调用,它会再次按预期工作,正确阻塞。我可以始终如一地重复这一点。新查询控制台中的第一个调用 window 总是正确阻塞,但每个后续调用总是 returns 立即。这是我在 postgres 中的第一个 C 函数,所以我想知道我是否只是在做一些根本性的错误。感谢您的任何帮助。我的代码如下。
C 函数:
#ifdef WIN32
#include <windows.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include "libpq-fe.h"
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <string.h>
#include "fmgr.h"
#include "utils/palloc.h"
#include "utils/elog.h"
#include "storage/bufpage.h"
#include "utils/builtins.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
PG_FUNCTION_INFO_V1( pg_wait );
Datum pg_wait( PG_FUNCTION_ARGS );
Datum pg_wait( PG_FUNCTION_ARGS )
{
PGconn *conn;
PGresult *res;
PGnotify *notify;
int nnotifies;
int sock;
fd_set input_mask;
char* conninfo = text_to_cstring(PG_GETARG_TEXT_PP(0));
char* channel_name = text_to_cstring(PG_GETARG_TEXT_PP(1));
char strlisten[50];
strcpy(strlisten, "LISTEN ");
strcat(strlisten, channel_name);
conn = PQconnectdb(conninfo);
res = PQexec(conn, strlisten);
PQclear(res);
sock = PQsocket(conn);
FD_ZERO(&input_mask);
FD_SET(sock, &input_mask);
select(sock + 1, &input_mask, NULL, NULL, NULL);
PQconsumeInput(conn);
if ((notify = PQnotifies(conn)) != NULL)
{
PQfreemem(notify);
PQconsumeInput(conn);
}
PQfinish(conn);
PG_RETURN_TEXT_P( PG_GETARG_TEXT_PP(1) );
}
util.wait_test_func()
create or replace function util.wait_test_func() returns integer
parallel safe
language plpgsql
as $$
DECLARE
signal_name TEXT;
BEGIN
RAISE NOTICE 'Before wait1';
SELECT pg_wait('dbname=edw port=5432','CHANNEL1') INTO signal_name;
RAISE NOTICE 'After wait1: %', signal_name;
RAISE NOTICE 'Before wait2';
SELECT pg_wait('dbname=edw port=5432','CHANNEL1') INTO signal_name;
RAISE NOTICE 'After wait2: %', signal_name;
RAISE NOTICE 'Before wait3';
SELECT pg_wait('dbname=edw port=5432','CHANNEL1') INTO signal_name;
RAISE NOTICE 'After wait3: %', signal_name;
RETURN 0;
END
$$;
再次感谢您提供的任何帮助。
你试过声明你的函数吗VOLATILE
我认为创建一个可以从 plpgsql 函数中使用的阻塞 wait() 可能很有趣(并且很有用)。我让它工作了,但我不确定它是否构思良好。使用 datagrip 时会出现一个有趣的问题。如果我从函数中调用 pg_wait():
select util.wait_test_func() into var --this calls my pg_wait()
该函数将正确阻塞,直到我在不同的查询控制台中发出 NOTIFY window。事实上,如果函数 wait_test_function() 调用我的 pg_wait() 三次,它将正确阻塞三次,我将不得不调用 NOTIFY 三次以允许函数完成。但是,一旦函数完成,如果我再次调用它,它会立即 returns,不会阻塞。几乎就像 NOTIFY 仍在队列中一样,但我并不真正相信这就是问题所在。如果我关闭 datagrip 查询控制台,打开一个新控制台,然后重新发出函数调用,它会再次按预期工作,正确阻塞。我可以始终如一地重复这一点。新查询控制台中的第一个调用 window 总是正确阻塞,但每个后续调用总是 returns 立即。这是我在 postgres 中的第一个 C 函数,所以我想知道我是否只是在做一些根本性的错误。感谢您的任何帮助。我的代码如下。
C 函数:
#ifdef WIN32
#include <windows.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/types.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
#include "libpq-fe.h"
#include "postgres.h"
#include <limits.h>
#include <unistd.h>
#include <string.h>
#include "fmgr.h"
#include "utils/palloc.h"
#include "utils/elog.h"
#include "storage/bufpage.h"
#include "utils/builtins.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
PG_FUNCTION_INFO_V1( pg_wait );
Datum pg_wait( PG_FUNCTION_ARGS );
Datum pg_wait( PG_FUNCTION_ARGS )
{
PGconn *conn;
PGresult *res;
PGnotify *notify;
int nnotifies;
int sock;
fd_set input_mask;
char* conninfo = text_to_cstring(PG_GETARG_TEXT_PP(0));
char* channel_name = text_to_cstring(PG_GETARG_TEXT_PP(1));
char strlisten[50];
strcpy(strlisten, "LISTEN ");
strcat(strlisten, channel_name);
conn = PQconnectdb(conninfo);
res = PQexec(conn, strlisten);
PQclear(res);
sock = PQsocket(conn);
FD_ZERO(&input_mask);
FD_SET(sock, &input_mask);
select(sock + 1, &input_mask, NULL, NULL, NULL);
PQconsumeInput(conn);
if ((notify = PQnotifies(conn)) != NULL)
{
PQfreemem(notify);
PQconsumeInput(conn);
}
PQfinish(conn);
PG_RETURN_TEXT_P( PG_GETARG_TEXT_PP(1) );
}
util.wait_test_func()
create or replace function util.wait_test_func() returns integer
parallel safe
language plpgsql
as $$
DECLARE
signal_name TEXT;
BEGIN
RAISE NOTICE 'Before wait1';
SELECT pg_wait('dbname=edw port=5432','CHANNEL1') INTO signal_name;
RAISE NOTICE 'After wait1: %', signal_name;
RAISE NOTICE 'Before wait2';
SELECT pg_wait('dbname=edw port=5432','CHANNEL1') INTO signal_name;
RAISE NOTICE 'After wait2: %', signal_name;
RAISE NOTICE 'Before wait3';
SELECT pg_wait('dbname=edw port=5432','CHANNEL1') INTO signal_name;
RAISE NOTICE 'After wait3: %', signal_name;
RETURN 0;
END
$$;
再次感谢您提供的任何帮助。
你试过声明你的函数吗VOLATILE