SWI-Prolog:将消息队列用于线程安全数据库 read/writes 和 `library(persistency)`
SWI-Prolog: Using message queues for thread-safe database read/writes with `library(persistency)`
SWI-Prolog advertises itself as a single-language replacement for the LAMP stack. When it comes to replacing the M(ySQL), the documentation enumerates several approaches, of which library(persistency)
似乎是能够提供持久、可更新数据库的最简单方法。
library(persistency)
的文档提供了以下示例,它展示了如何使用 mutexes 来避免读取和更新数据库时的无效状态:
:- module(user_db,
[ attach_user_db/1, % +File
current_user_role/2, % ?User, ?Role
add_user/2, % +User, +Role
set_user_role/2 % +User, +Role
]).
:- use_module(library(persistency)).
:- persistent
user_role(name:atom, role:oneof([user,administrator])).
attach_user_db(File) :-
db_attach(File, []).
%% current_user_role(+Name, -Role) is semidet.
current_user_role(Name, Role) :-
with_mutex(user_db, user_role(Name, Role)).
add_user(Name, Role) :-
assert_user_role(Name, Role).
set_user_role(Name, Role) :-
user_role(Name, Role), !.
set_user_role(Name, Role) :-
with_mutex(user_db,
( retractall_user_role(Name, _),
assert_user_role(Name, Role))).
然而,在the documentation on thread synchronization via mutex
中指出
The predicate with_mutex/2
behaves as once/1
with respect to the guarded goal. This means that our predicate address/2
is no longer a nice logical non-deterministic relation.
不得不放弃 "nice logical non-deterministic relations" 以保持有效性似乎是一笔不划算的交易,因为良好的逻辑非确定性关系是 Prolog 的主要优势!幸运的是,推荐了一个更有吸引力的替代方案:
Message queues (see message_queue_create/3
) often provide simpler and more robust ways for threads to communicate.
听起来我应该能够使用消息队列,如 thread communication 上的文档中所述,以便在不牺牲 LP 范例本质的情况下实现安全 reads/updates。不幸的是,作为线程的新手,我无法弄清楚消息队列的这种用法会是什么样子!
我希望有人能够转换 library(persistency)
示例,用更合适的消息队列替换 互斥体 的使用,确保持续有效性在不牺牲非确定性关系的情况下改变数据库的状态。
在 SWI Prolog 中,每个线程都有自己的消息队列。因此,您可以 运行 线程上的数据库服务器,并让客户端 post 查询数据库的消息队列。数据库会一次一个地处理每个请求,使数据库始终有效。查询仍然是确定性的(如 once/1),但正如您所指出的,与 with_mutex/2
的情况不同,数据库关系本身可以按关系指定。
[注意,我正在展示如何使用内置的 SWI Prolog 消息队列来执行此操作,但您也可以为此使用 Pengines,这可能对用户更友好,并且内置支持远程执行。]
首先,我将删除 with_mutex
个调用:
:- module(user_db,
[ attach_user_db/1, % +File
current_user_role/2, % ?User, ?Role
add_user/2, % +User, +Role
set_user_role/2 % +User, +Role
]).
:- use_module(library(persistency)).
:- persistent
user_role(name:atom, role:oneof([user,administrator])).
attach_user_db(File) :-
db_attach(File, []).
%% current_user_role(+Name, -Role) is semidet.
current_user_role(Name, Role) :-
user_role(Name, Role).
add_user(Name, Role) :-
assert_user_role(Name, Role).
set_user_role(Name, Role) :-
user_role(Name, Role), !.
set_user_role(Name, Role) :-
retractall_user_role(Name, _),
assert_user_role(Name, Role).
我刚刚在同一个文件中添加了数据库服务器代码,但它可能应该放在其他地方。另外,使用 :- debug(db),
打开调试消息,我发现这在多线程代码中是必不可少的。
我们需要一个谓词来启动 db_thread。它的名字是 db
并且它是 "detached," 所以它会在系统退出时被清理掉。线程从调用 db_run/0.
开始
db_up(File, DbThreadId) :-
db_attach(File, []),
thread_create(db_run, DbThreadId, [detached(true), alias(db)]),
debug(db, 'db thread created~n').
`db_run/0' 是一个故障驱动循环,运行 在数据库线程中检查其消息队列中的新消息。当收到消息时,它会被调用。完成后,循环再次开始。
db_run :-
debug(db, 'db_run:...', []),
repeat,
thread_get_message(db, Msg, []),
debug(db, 'Received: ~p', [Msg]),
Msg,
debug(db, 'db_query succeeded', []),
fail.
客户端发送 db_query(<Query>, <ClientThreadId>)
形式的消息,因此我们需要一个名为 db_query/2
的谓词,它实际上是 运行 计算。它向客户端线程发送成功、失败或异常消息。
:- meta_predicate user_db:db_query(0,*).
db_query(Goal, ClientId) :-
catch((Goal -> Status = true; Status = false),
Err,
Status = err(Err)),
( Status = true ->
Response = db_response(succ(Goal))
;
Status = false ->
Response = db_response(fail)
;
Status = err(Err) ->
Response = db_response(err(Err))
),
debug(db, 'db_query/2: sending message ~w to ~p', [Response, ClientId]),
thread_send_message(ClientId, Response).
最后,我们需要一个谓词 posts 从客户端到数据库的查询。消息发送后,客户端使用 client_wait/1
等待响应。
:- meta_predicate client_post(0).
client_post(Goal) :-
thread_self(Me),
Msg = db_query(Goal, Me),
debug(db, 'client_post/1: sending message ~p...', [Msg]),
thread_send_message(db, Msg),
debug(db, 'client_post/1: waiting...', []),
client_wait(Goal).
client_wait/1
等待 db_response() 形式的消息(在失败前最多等待 1 秒,但您可能想做一些更聪明的事情)。它
:- meta_predicate client_wait(0).
client_wait(Goal) :-
thread_self(Me),
thread_get_message(Me, db_response(Term), [timeout(1)]), % blocks until db_response(_) arrives
Msg = db_response(Term),
debug(db, 'Client received ~p', [Msg]),
( Term = succ(Goal) ->
debug(db, 'client_wait/1: exit with true', []),
true
;
Term = fail ->
fail
;
Term = err(Err) ->
throw(Err)
;
domain_error(db_response_message, Msg)
).
有了这个,我们可以创建数据库并发送查询:
$ swipl -l db_thread.pl
Welcome to SWI-Prolog (Multi-threaded, 64 bits, Version 7.3.24-127-g9b94a9f-DIRTY)
Copyright (c) 1990-2016 University of Amsterdam, VU Amsterdam
SWI-Prolog comes with ABSOLUTELY NO WARRANTY. This is free software,
and you are welcome to redistribute it under certain conditions.
Please visit http://www.swi-prolog.org for details.
For help, use ?- help(Topic). or ?- apropos(Word).
?- user_db:db_up('db.pl', DB).
db thread created
DB = db.
?- Xs = [bob-administrator, john-user, bill-user], user_db:client_post(forall(member(U-R, Xs), add_user(U, R))).
Xs = [bob-administrator, john-user, bill-user].
?- findall(U, user_db:client_post(current_user_role(U, user)), Users). %% queries are posted as in once/1
Users = [john].
?- user_db:client_post(findall(U, current_user_role(U, user), Users)). %% but db predicates are themselves relational
Users = [john, bill].
这个设置保持数据库一致性的小测试。在这个文件 test_db.pl
中,我创建了数据库和 运行 两个线程。一个 运行ning toggle/0
在两个用户角色数据库之间切换,另一个 运行ning print/0
只打印出用户和他们的角色。我们切换世界,时间随机间隔 200 次。同时,另一个线程以 200 次随机间隔时间打印出数据库。
test_db.pl
:
:- use_module(user_db).
:- initialization user_db:db_up('db.pl', _), test.
world(1, [bob-administrator,
john-user]).
world(2, [bob-user,
john-administrator]).
set_world(I) :-
world(I, Xs),
forall(member(U-R, Xs),
set_user_role(U, R)).
print_world :-
findall(U-R,
current_user_role(U, R),
URs),
sort(URs, URs1),
format('~p~n', [URs1]).
random_sleep :-
random(R),
X is R * 0.05,
sleep(X).
toggle(0) :- !.
toggle(N) :-
forall(world(I, _),
(user_db:client_post(set_world(I)),
random_sleep)),
succ(N0, N),
toggle(N0).
print(0) :- !.
print(N) :-
user_db:client_post(print_world),
succ(N0, N),
random_sleep,
print(N0).
test :-
thread_create(toggle(100), Id1, []),
thread_create(print(200), Id2, []),
thread_join(Id1, _),
thread_join(Id2, _).
我们运行这个与$ swipl -l test_db.pl
:
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
...
SWI-Prolog advertises itself as a single-language replacement for the LAMP stack. When it comes to replacing the M(ySQL), the documentation enumerates several approaches, of which library(persistency)
似乎是能够提供持久、可更新数据库的最简单方法。
library(persistency)
的文档提供了以下示例,它展示了如何使用 mutexes 来避免读取和更新数据库时的无效状态:
:- module(user_db,
[ attach_user_db/1, % +File
current_user_role/2, % ?User, ?Role
add_user/2, % +User, +Role
set_user_role/2 % +User, +Role
]).
:- use_module(library(persistency)).
:- persistent
user_role(name:atom, role:oneof([user,administrator])).
attach_user_db(File) :-
db_attach(File, []).
%% current_user_role(+Name, -Role) is semidet.
current_user_role(Name, Role) :-
with_mutex(user_db, user_role(Name, Role)).
add_user(Name, Role) :-
assert_user_role(Name, Role).
set_user_role(Name, Role) :-
user_role(Name, Role), !.
set_user_role(Name, Role) :-
with_mutex(user_db,
( retractall_user_role(Name, _),
assert_user_role(Name, Role))).
然而,在the documentation on thread synchronization via mutex
中指出
The predicate
with_mutex/2
behaves asonce/1
with respect to the guarded goal. This means that our predicateaddress/2
is no longer a nice logical non-deterministic relation.
不得不放弃 "nice logical non-deterministic relations" 以保持有效性似乎是一笔不划算的交易,因为良好的逻辑非确定性关系是 Prolog 的主要优势!幸运的是,推荐了一个更有吸引力的替代方案:
Message queues (see
message_queue_create/3
) often provide simpler and more robust ways for threads to communicate.
听起来我应该能够使用消息队列,如 thread communication 上的文档中所述,以便在不牺牲 LP 范例本质的情况下实现安全 reads/updates。不幸的是,作为线程的新手,我无法弄清楚消息队列的这种用法会是什么样子!
我希望有人能够转换 library(persistency)
示例,用更合适的消息队列替换 互斥体 的使用,确保持续有效性在不牺牲非确定性关系的情况下改变数据库的状态。
在 SWI Prolog 中,每个线程都有自己的消息队列。因此,您可以 运行 线程上的数据库服务器,并让客户端 post 查询数据库的消息队列。数据库会一次一个地处理每个请求,使数据库始终有效。查询仍然是确定性的(如 once/1),但正如您所指出的,与 with_mutex/2
的情况不同,数据库关系本身可以按关系指定。
[注意,我正在展示如何使用内置的 SWI Prolog 消息队列来执行此操作,但您也可以为此使用 Pengines,这可能对用户更友好,并且内置支持远程执行。]
首先,我将删除 with_mutex
个调用:
:- module(user_db,
[ attach_user_db/1, % +File
current_user_role/2, % ?User, ?Role
add_user/2, % +User, +Role
set_user_role/2 % +User, +Role
]).
:- use_module(library(persistency)).
:- persistent
user_role(name:atom, role:oneof([user,administrator])).
attach_user_db(File) :-
db_attach(File, []).
%% current_user_role(+Name, -Role) is semidet.
current_user_role(Name, Role) :-
user_role(Name, Role).
add_user(Name, Role) :-
assert_user_role(Name, Role).
set_user_role(Name, Role) :-
user_role(Name, Role), !.
set_user_role(Name, Role) :-
retractall_user_role(Name, _),
assert_user_role(Name, Role).
我刚刚在同一个文件中添加了数据库服务器代码,但它可能应该放在其他地方。另外,使用 :- debug(db),
打开调试消息,我发现这在多线程代码中是必不可少的。
我们需要一个谓词来启动 db_thread。它的名字是 db
并且它是 "detached," 所以它会在系统退出时被清理掉。线程从调用 db_run/0.
db_up(File, DbThreadId) :-
db_attach(File, []),
thread_create(db_run, DbThreadId, [detached(true), alias(db)]),
debug(db, 'db thread created~n').
`db_run/0' 是一个故障驱动循环,运行 在数据库线程中检查其消息队列中的新消息。当收到消息时,它会被调用。完成后,循环再次开始。
db_run :-
debug(db, 'db_run:...', []),
repeat,
thread_get_message(db, Msg, []),
debug(db, 'Received: ~p', [Msg]),
Msg,
debug(db, 'db_query succeeded', []),
fail.
客户端发送 db_query(<Query>, <ClientThreadId>)
形式的消息,因此我们需要一个名为 db_query/2
的谓词,它实际上是 运行 计算。它向客户端线程发送成功、失败或异常消息。
:- meta_predicate user_db:db_query(0,*).
db_query(Goal, ClientId) :-
catch((Goal -> Status = true; Status = false),
Err,
Status = err(Err)),
( Status = true ->
Response = db_response(succ(Goal))
;
Status = false ->
Response = db_response(fail)
;
Status = err(Err) ->
Response = db_response(err(Err))
),
debug(db, 'db_query/2: sending message ~w to ~p', [Response, ClientId]),
thread_send_message(ClientId, Response).
最后,我们需要一个谓词 posts 从客户端到数据库的查询。消息发送后,客户端使用 client_wait/1
等待响应。
:- meta_predicate client_post(0).
client_post(Goal) :-
thread_self(Me),
Msg = db_query(Goal, Me),
debug(db, 'client_post/1: sending message ~p...', [Msg]),
thread_send_message(db, Msg),
debug(db, 'client_post/1: waiting...', []),
client_wait(Goal).
client_wait/1
等待 db_response() 形式的消息(在失败前最多等待 1 秒,但您可能想做一些更聪明的事情)。它
:- meta_predicate client_wait(0).
client_wait(Goal) :-
thread_self(Me),
thread_get_message(Me, db_response(Term), [timeout(1)]), % blocks until db_response(_) arrives
Msg = db_response(Term),
debug(db, 'Client received ~p', [Msg]),
( Term = succ(Goal) ->
debug(db, 'client_wait/1: exit with true', []),
true
;
Term = fail ->
fail
;
Term = err(Err) ->
throw(Err)
;
domain_error(db_response_message, Msg)
).
有了这个,我们可以创建数据库并发送查询:
$ swipl -l db_thread.pl
Welcome to SWI-Prolog (Multi-threaded, 64 bits, Version 7.3.24-127-g9b94a9f-DIRTY)
Copyright (c) 1990-2016 University of Amsterdam, VU Amsterdam
SWI-Prolog comes with ABSOLUTELY NO WARRANTY. This is free software,
and you are welcome to redistribute it under certain conditions.
Please visit http://www.swi-prolog.org for details.
For help, use ?- help(Topic). or ?- apropos(Word).
?- user_db:db_up('db.pl', DB).
db thread created
DB = db.
?- Xs = [bob-administrator, john-user, bill-user], user_db:client_post(forall(member(U-R, Xs), add_user(U, R))).
Xs = [bob-administrator, john-user, bill-user].
?- findall(U, user_db:client_post(current_user_role(U, user)), Users). %% queries are posted as in once/1
Users = [john].
?- user_db:client_post(findall(U, current_user_role(U, user), Users)). %% but db predicates are themselves relational
Users = [john, bill].
这个设置保持数据库一致性的小测试。在这个文件 test_db.pl
中,我创建了数据库和 运行 两个线程。一个 运行ning toggle/0
在两个用户角色数据库之间切换,另一个 运行ning print/0
只打印出用户和他们的角色。我们切换世界,时间随机间隔 200 次。同时,另一个线程以 200 次随机间隔时间打印出数据库。
test_db.pl
:
:- use_module(user_db).
:- initialization user_db:db_up('db.pl', _), test.
world(1, [bob-administrator,
john-user]).
world(2, [bob-user,
john-administrator]).
set_world(I) :-
world(I, Xs),
forall(member(U-R, Xs),
set_user_role(U, R)).
print_world :-
findall(U-R,
current_user_role(U, R),
URs),
sort(URs, URs1),
format('~p~n', [URs1]).
random_sleep :-
random(R),
X is R * 0.05,
sleep(X).
toggle(0) :- !.
toggle(N) :-
forall(world(I, _),
(user_db:client_post(set_world(I)),
random_sleep)),
succ(N0, N),
toggle(N0).
print(0) :- !.
print(N) :-
user_db:client_post(print_world),
succ(N0, N),
random_sleep,
print(N0).
test :-
thread_create(toggle(100), Id1, []),
thread_create(print(200), Id2, []),
thread_join(Id1, _),
thread_join(Id2, _).
我们运行这个与$ swipl -l test_db.pl
:
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
...