SWI-Prolog:将消息队列用于线程安全数据库 read/writes 和 `library(persistency)`

SWI-Prolog: Using message queues for thread-safe database read/writes with `library(persistency)`

SWI-Prolog advertises itself as a single-language replacement for the LAMP stack. When it comes to replacing the M(ySQL), the documentation enumerates several approaches, of which library(persistency) 似乎是能够提供持久、可更新数据库的最简单方法。

library(persistency) 的文档提供了以下示例,它展示了如何使用 mutexes 来避免读取和更新数据库时的无效状态:

:- module(user_db,
          [ attach_user_db/1,       % +File
            current_user_role/2,    % ?User, ?Role
            add_user/2,         % +User, +Role
            set_user_role/2     % +User, +Role
          ]).
:- use_module(library(persistency)).

:- persistent
        user_role(name:atom, role:oneof([user,administrator])).

attach_user_db(File) :-
        db_attach(File, []).

%%  current_user_role(+Name, -Role) is semidet.

current_user_role(Name, Role) :-
        with_mutex(user_db, user_role(Name, Role)).

add_user(Name, Role) :-
        assert_user_role(Name, Role).

set_user_role(Name, Role) :-
        user_role(Name, Role), !.
set_user_role(Name, Role) :-
        with_mutex(user_db,
                   (  retractall_user_role(Name, _),
                      assert_user_role(Name, Role))).

然而,在the documentation on thread synchronization via mutex中指出

The predicate with_mutex/2 behaves as once/1 with respect to the guarded goal. This means that our predicate address/2 is no longer a nice logical non-deterministic relation.

不得不放弃 "nice logical non-deterministic relations" 以保持有效性似乎是一笔不划算的交易,因为良好的逻辑非确定性关系是 Prolog 的主要优势!幸运的是,推荐了一个更有吸引力的替代方案:

Message queues (see message_queue_create/3) often provide simpler and more robust ways for threads to communicate.

听起来我应该能够使用消息队列,如 thread communication 上的文档中所述,以便在不牺牲 LP 范例本质的情况下实现安全 reads/updates。不幸的是,作为线程的新手,我无法弄清楚消息队列的这种用法会是什么样子!

我希望有人能够转换 library(persistency) 示例,用更合适的消息队列替换 互斥体 的使用,确保持续有效性在不牺牲非确定性关系的情况下改变数据库的状态。

在 SWI Prolog 中,每个线程都有自己的消息队列。因此,您可以 运行 线程上的数据库服务器,并让客户端 post 查询数据库的消息队列。数据库会一次一个地处理每个请求,使数据库始终有效。查询仍然是确定性的(如 once/1),但正如您所指出的,与 with_mutex/2 的情况不同,数据库关系本身可以按关系指定。

[注意,我正在展示如何使用内置的 SWI Prolog 消息队列来执行此操作,但您也可以为此使用 Pengines,这可能对用户更友好,并且内置支持远程执行。]

首先,我将删除 with_mutex 个调用:

:- module(user_db,
          [ attach_user_db/1,       % +File
            current_user_role/2,    % ?User, ?Role
            add_user/2,         % +User, +Role
            set_user_role/2     % +User, +Role
          ]).
:- use_module(library(persistency)).

:- persistent
        user_role(name:atom, role:oneof([user,administrator])).

attach_user_db(File) :-
        db_attach(File, []).

%%  current_user_role(+Name, -Role) is semidet.

current_user_role(Name, Role) :-
        user_role(Name, Role).

add_user(Name, Role) :-
        assert_user_role(Name, Role).

set_user_role(Name, Role) :-
    user_role(Name, Role), !.
set_user_role(Name, Role) :-
    retractall_user_role(Name, _),
    assert_user_role(Name, Role).

我刚刚在同一个文件中添加了数据库服务器代码,但它可能应该放在其他地方。另外,使用 :- debug(db), 打开调试消息,我发现这在多线程代码中是必不可少的。

我们需要一个谓词来启动 db_thread。它的名字是 db 并且它是 "detached," 所以它会在系统退出时被清理掉。线程从调用 db_run/0.

开始
db_up(File, DbThreadId) :-
    db_attach(File, []),
    thread_create(db_run, DbThreadId, [detached(true), alias(db)]),
    debug(db, 'db thread created~n').   

`db_run/0' 是一个故障驱动循环,运行 在数据库线程中检查其消息队列中的新消息。当收到消息时,它会被调用。完成后,循环再次开始。

db_run :-
    debug(db, 'db_run:...', []),
    repeat, 
      thread_get_message(db, Msg, []),
      debug(db, 'Received: ~p', [Msg]),
      Msg,            
      debug(db, 'db_query succeeded', []),
      fail.

客户端发送 db_query(<Query>, <ClientThreadId>) 形式的消息,因此我们需要一个名为 db_query/2 的谓词,它实际上是 运行 计算。它向客户端线程发送成功、失败或异常消息。

:- meta_predicate user_db:db_query(0,*).
db_query(Goal, ClientId) :-
    catch((Goal -> Status = true; Status = false),
          Err,
          Status = err(Err)), 
    (   Status = true ->
            Response = db_response(succ(Goal))
     ;
     Status = false ->
         Response = db_response(fail)
     ;
     Status = err(Err) ->
         Response = db_response(err(Err))
    ),
    debug(db, 'db_query/2: sending message ~w to ~p', [Response, ClientId]),
    thread_send_message(ClientId, Response).

最后,我们需要一个谓词 posts 从客户端到数据库的查询。消息发送后,客户端使用 client_wait/1 等待响应。

:- meta_predicate client_post(0).
client_post(Goal) :-
    thread_self(Me),
    Msg = db_query(Goal, Me),
    debug(db, 'client_post/1: sending message ~p...', [Msg]),
    thread_send_message(db, Msg),
    debug(db, 'client_post/1: waiting...', []),
    client_wait(Goal).

client_wait/1 等待 db_response() 形式的消息(在失败前最多等待 1 秒,但您可能想做一些更聪明的事情)。它

:- meta_predicate client_wait(0).
client_wait(Goal) :-
    thread_self(Me), 
    thread_get_message(Me, db_response(Term), [timeout(1)]), % blocks until db_response(_) arrives
    Msg = db_response(Term),
    debug(db, 'Client received ~p', [Msg]),
    (   Term = succ(Goal) ->
            debug(db, 'client_wait/1: exit with true', []),
            true
     ;
     Term = fail ->
         fail
     ;
     Term = err(Err) ->
         throw(Err)
     ;
     domain_error(db_response_message, Msg)
    ).

有了这个,我们可以创建数据库并发送查询:

$ swipl -l db_thread.pl 
Welcome to SWI-Prolog (Multi-threaded, 64 bits, Version 7.3.24-127-g9b94a9f-DIRTY)
Copyright (c) 1990-2016 University of Amsterdam, VU Amsterdam
SWI-Prolog comes with ABSOLUTELY NO WARRANTY. This is free software,
and you are welcome to redistribute it under certain conditions.
Please visit http://www.swi-prolog.org for details.

For help, use ?- help(Topic). or ?- apropos(Word).

?- user_db:db_up('db.pl', DB).
db thread created
DB = db.

?- Xs = [bob-administrator, john-user, bill-user], user_db:client_post(forall(member(U-R, Xs), add_user(U, R))).
Xs = [bob-administrator, john-user, bill-user].

?- findall(U, user_db:client_post(current_user_role(U, user)), Users).  %% queries are posted as in once/1
Users = [john].

?- user_db:client_post(findall(U, current_user_role(U, user), Users)).  %% but db predicates are themselves relational
Users = [john, bill].

这个设置保持数据库一致性的小测试。在这个文件 test_db.pl 中,我创建了数据库和 运行 两个线程。一个 运行ning toggle/0 在两个用户角色数据库之间切换,另一个 运行ning print/0 只打印出用户和他们的角色。我们切换世界,时间随机间隔 200 次。同时,另一个线程以 200 次随机间隔时间打印出数据库。

test_db.pl:

:- use_module(user_db).

:- initialization user_db:db_up('db.pl', _), test.

world(1, [bob-administrator,
       john-user]).

world(2, [bob-user,
         john-administrator]).

set_world(I) :-
    world(I, Xs),
    forall(member(U-R, Xs),
           set_user_role(U, R)).

print_world :-
    findall(U-R,
            current_user_role(U, R),
            URs),
    sort(URs, URs1),
    format('~p~n', [URs1]).



random_sleep :-
    random(R), 
    X is R * 0.05,
    sleep(X).

toggle(0) :- !. 
toggle(N) :-
    forall(world(I, _), 
           (user_db:client_post(set_world(I)),
             random_sleep)),
    succ(N0, N),
    toggle(N0).

print(0) :- !. 
print(N) :-
    user_db:client_post(print_world),
    succ(N0, N),
    random_sleep, 
    print(N0).



test :-

    thread_create(toggle(100), Id1, []), 
    thread_create(print(200), Id2, []),
    thread_join(Id1, _),
    thread_join(Id2, _).

我们运行这个与$ swipl -l test_db.pl

[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
...