gpt4 book ai didi

multithreading - SWI-Prolog:使用消息队列进行线程安全的数据库读/写, `library(persistency)`

转载 作者:行者123 更新时间:2023-12-04 06:44:06 24 4
gpt4 key购买 nike

SWI-Prolog advertises itself as a single-language replacement for the LAMP stack .说到更换 (ySQL),文档列举了几种方法,其中 library(persistency) 似乎是能够提供持久的、可更新的数据库的最简单的方法。

library(persistency) 的文档提供了以下示例,该示例展示了如何在读取和更新数据库时使用互斥体来避免无效状态:

:- module(user_db,
[ attach_user_db/1, % +File
current_user_role/2, % ?User, ?Role
add_user/2, % +User, +Role
set_user_role/2 % +User, +Role
]).
:- use_module(library(persistency)).

:- persistent
user_role(name:atom, role:oneof([user,administrator])).

attach_user_db(File) :-
db_attach(File, []).

%% current_user_role(+Name, -Role) is semidet.

current_user_role(Name, Role) :-
with_mutex(user_db, user_role(Name, Role)).

add_user(Name, Role) :-
assert_user_role(Name, Role).

set_user_role(Name, Role) :-
user_role(Name, Role), !.
set_user_role(Name, Role) :-
with_mutex(user_db,
( retractall_user_role(Name, _),
assert_user_role(Name, Role))).

然而,在 the documentation on thread synchronization via mutex ,它指出

The predicate with_mutex/2 behaves as once/1 with respect to the guarded goal. This means that our predicate address/2 is no longer a nice logical non-deterministic relation.



为了保持有效性而不得不放弃“良好的逻辑非确定性关系”似乎是一个糟糕的交易,因为良好的逻辑非确定性关系是 Prolog 的主要优势!幸运的是,提出了一个更有吸引力的替代方案:

Message queues (see message_queue_create/3) often provide simpler and more robust ways for threads to communicate.



听起来我应该能够使用消息队列,如 thread communication 上的文档中所述。为了在不牺牲 LP 范式本质的情况下实现安全读取/更新。不幸的是,作为线程的新手,我无法弄清楚消息队列的这种用法会是什么样子!

我希望有人能够改造 library(persistency)例如,将互斥锁的使用替换为更合适的消息队列使用,确保数据库状态的持续有效性而不牺牲非确定性关系。

最佳答案

在 SWI Prolog 中,每个线程都有自己的消息队列。因此,您可以在线程上运行数据库服务器,并让客户端将查询发布到数据库的消息队列。数据库将一次处理一个请求,以便数据库始终有效。查询仍然是确定性的(如 once/1),但正如您所指出的,与 with_mutex/2 的情况不同,可以通过关系指定数据库关系本身。

[注意,我正在展示如何使用内置的 SWI Prolog 消息队列来执行此操作,但您也可以使用 Pengines为此,这可能对用户更友好,并且内置了对远程执行的支持。]

首先,我将删除 with_mutex调用:

:- module(user_db,
[ attach_user_db/1, % +File
current_user_role/2, % ?User, ?Role
add_user/2, % +User, +Role
set_user_role/2 % +User, +Role
]).
:- use_module(library(persistency)).

:- persistent
user_role(name:atom, role:oneof([user,administrator])).

attach_user_db(File) :-
db_attach(File, []).

%% current_user_role(+Name, -Role) is semidet.

current_user_role(Name, Role) :-
user_role(Name, Role).

add_user(Name, Role) :-
assert_user_role(Name, Role).

set_user_role(Name, Role) :-
user_role(Name, Role), !.
set_user_role(Name, Role) :-
retractall_user_role(Name, _),
assert_user_role(Name, Role).

我只是在同一个文件中添加了数据库服务器代码,但它可能应该放在其他地方。另外,使用 :- debug(db), 打开调试消息我发现它在多线程代码中是必不可少的。

我们需要一个谓词来启动 db_thread。它的名字是 db并且它是“分离的”,所以它会在系统退出时被清理。该线程通过调用 db_run/0. 启动
db_up(File, DbThreadId) :-
db_attach(File, []),
thread_create(db_run, DbThreadId, [detached(true), alias(db)]),
debug(db, 'db thread created~n').

`db_run/0' 是一个失败驱动的循环,它在 db 线程中运行并检查其消息队列中是否有新消息。当接收到消息时,它被调用。完成后,循环再次开始。
db_run :-
debug(db, 'db_run:...', []),
repeat,
thread_get_message(db, Msg, []),
debug(db, 'Received: ~p', [Msg]),
Msg,
debug(db, 'db_query succeeded', []),
fail.

客户端发送形式为 db_query(<Query>, <ClientThreadId>) 的消息,所以我们需要一个谓词 db_query/2实际运行计算。它向客户端线程发送成功、失败或异常消息。
:- meta_predicate user_db:db_query(0,*).
db_query(Goal, ClientId) :-
catch((Goal -> Status = true; Status = false),
Err,
Status = err(Err)),
( Status = true ->
Response = db_response(succ(Goal))
;
Status = false ->
Response = db_response(fail)
;
Status = err(Err) ->
Response = db_response(err(Err))
),
debug(db, 'db_query/2: sending message ~w to ~p', [Response, ClientId]),
thread_send_message(ClientId, Response).

最后,我们需要一个从客户端向数据库发布查询的谓词。消息发送后,客户端使用 client_wait/1 等待响应。 .
:- meta_predicate client_post(0).
client_post(Goal) :-
thread_self(Me),
Msg = db_query(Goal, Me),
debug(db, 'client_post/1: sending message ~p...', [Msg]),
thread_send_message(db, Msg),
debug(db, 'client_post/1: waiting...', []),
client_wait(Goal).
client_wait/1等待 db_response() 形式的消息(在失败前最多等待 1 秒,但您可能想要做一些更聪明的事情)。它
:- meta_predicate client_wait(0).
client_wait(Goal) :-
thread_self(Me),
thread_get_message(Me, db_response(Term), [timeout(1)]), % blocks until db_response(_) arrives
Msg = db_response(Term),
debug(db, 'Client received ~p', [Msg]),
( Term = succ(Goal) ->
debug(db, 'client_wait/1: exit with true', []),
true
;
Term = fail ->
fail
;
Term = err(Err) ->
throw(Err)
;
domain_error(db_response_message, Msg)
).

有了这个,我们可以创建数据库并发送查询:
$ swipl -l db_thread.pl 
Welcome to SWI-Prolog (Multi-threaded, 64 bits, Version 7.3.24-127-g9b94a9f-DIRTY)
Copyright (c) 1990-2016 University of Amsterdam, VU Amsterdam
SWI-Prolog comes with ABSOLUTELY NO WARRANTY. This is free software,
and you are welcome to redistribute it under certain conditions.
Please visit http://www.swi-prolog.org for details.

For help, use ?- help(Topic). or ?- apropos(Word).

?- user_db:db_up('db.pl', DB).
db thread created
DB = db.

?- Xs = [bob-administrator, john-user, bill-user], user_db:client_post(forall(member(U-R, Xs), add_user(U, R))).
Xs = [bob-administrator, john-user, bill-user].

?- findall(U, user_db:client_post(current_user_role(U, user)), Users). %% queries are posted as in once/1
Users = [john].

?- user_db:client_post(findall(U, current_user_role(U, user), Users)). %% but db predicates are themselves relational
Users = [john, bill].

一个小测试,这个设置保留了数据库的一致性。在这个文件中 test_db.pl ,我创建数据库,并运行两个线程。正在运行的一个 toggle/0在两个用户角色数据库和另一个之间切换,运行 print/0只是打印出用户和他们的角色。我们在时间上随机地切换世界 200 次。同时,另一个线程以随机间隔的 200 次打印出 db。
test_db.pl :
:- use_module(user_db).

:- initialization user_db:db_up('db.pl', _), test.

world(1, [bob-administrator,
john-user]).

world(2, [bob-user,
john-administrator]).

set_world(I) :-
world(I, Xs),
forall(member(U-R, Xs),
set_user_role(U, R)).

print_world :-
findall(U-R,
current_user_role(U, R),
URs),
sort(URs, URs1),
format('~p~n', [URs1]).



random_sleep :-
random(R),
X is R * 0.05,
sleep(X).

toggle(0) :- !.
toggle(N) :-
forall(world(I, _),
(user_db:client_post(set_world(I)),
random_sleep)),
succ(N0, N),
toggle(N0).

print(0) :- !.
print(N) :-
user_db:client_post(print_world),
succ(N0, N),
random_sleep,
print(N0).



test :-

thread_create(toggle(100), Id1, []),
thread_create(print(200), Id2, []),
thread_join(Id1, _),
thread_join(Id2, _).

我们用 $ swipl -l test_db.pl 运行它:
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
[bob-user,john-administrator]
[bob-administrator,john-user]
...

关于multithreading - SWI-Prolog:使用消息队列进行线程安全的数据库读/写, `library(persistency)`,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40088552/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com