gpt4 book ai didi

python - 有用的发布-订阅语义

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:39:01 34 4
gpt4 key购买 nike

我正在寻找维基百科风格的引用资料,以引用实际工作的轻量级发布-订阅机制的设计和实现。我会根据答案和评论以及我自己的研究更新问题。

我研究了我的书籍和网络,寻找使用 Python 和 Delphi 完成的发布/订阅工作,但对结果并不满意。这些设计依赖于函数签名或位图或插槽来过滤消息或决定应该将什么传递给谁,并且要么过于严格(绑定(bind)到消息服务器),要么过于混杂(每个人都可以订阅任何东西)。

我不想自己写。我想找到已经设计良好、经过辩论和现场验证的东西。

今天我在 Delphi Pascal 中实现了一个设计(因为 Delphi 是我首先需要的)。正如这个 API 所做的那样,根据参数类型进行分派(dispatch)并不是一个原始想法(它在 Design Patterns Visitor 模式中进行了解释),我想我以前见过类似的东西(但我不记得在哪里;Taligent?)。它的核心是订阅、过滤和调度是在类型系统之上。

unit JalSignals;
// A publish/subscribe mechanism.
// 1. Signal payloads are objects, and their class is their signal type.
// 2. Free is called on the payloads after they have been delivered.
// 3. Members subscribe by providing a callback method (of object).
// 4. Members may subscribe with the same method to different types of signals.
// 5. A member subscribes to a type, which means that all signals
// with payloads of that class or any of its subclasses will be delivered
// to the callback, with one important exception
// 6. A forum breaks the general class hierarchy into independent branches.
// A signal will not be delivered to members subscribed to classes that
// are not in the branch.
// 7. This is a GPL v3 design.
interface
uses
SysUtils;
type
TSignal = TObject;
TSignalType = TClass;
TSignalAction = (soGo, soStop);
TCallback = function(signal :TSignal) :TSignalAction of object;

procedure signal(payload: TSignal);

procedure subscribe( callback :TCallback; atype :TSignalType);
procedure unsubscribe(callback :TCallback; atype :TSignalType = nil); overload;
procedure unsubscribe(obj :TObject; atype :TSignalType = nil); overload;

procedure openForum( atype :TSignalType);
procedure closeForum(atype :TSignalType);

上面的“回调”就像 Python 中的绑定(bind)方法。

Delphi 实现的完整源代码是here :

这是Python中的实现。我更改了键名,因为 signalmessage 已经重载了。与 Delphi 实现不同,结果(包括异常)被收集并返回到列表中的信号器。

"""
A publish/subscribe mechanism.

1. Signal payloads are objects, and their class is their signal type.
2. Free is called on the payloads after they have been delivered.
3. Members subscribe by providing a callback method (of object).
4. Members may subscribe with the same method to different types of signals.
5. A member subscribes to a type, which means that all signals
with payloads of that class or any of its subclasses will be delivered
to the callback, with one important exception:
6. A forum breaks the general class hierarchy into independent branches.
A signal will not be delivered to members subscribed to classes that
are not in the branch.
"""

__all__ = ['open_forum', 'close_forum', 'announce',
'subscribe', 'unsubscribe'
]

def _is_type(atype):
return issubclass(atype, object)

class Sub(object):
def __init__(self, callback, atype):
assert callable(callback)
assert issubclass(atype, object)
self.atype = atype
self.callback = callback

__forums = set()
__subscriptions = []

def open_forum(forum):
assert issubclass(forum, object)
__forums.add(forum)

def close_forum(forum):
__forums.remove(forum)

def subscribe(callback, atype):
__subscriptions.append(Sub(callback, atype))

def unsubscribe(callback, atype=None):
for i, sub in enumerate(__subscriptions):
if sub.callback is not callback:
continue
if atype is None or issubclass(sub.atype, atype):
del __subscriptions[i]

def _boundary(atype):
assert _is_type(atype)
lower = object
for f in __forums:
if (issubclass(atype, f)
and issubclass(f, lower)):
lower = f
return lower

def _receivers(news):
bound = _boundary(type(news))
for sub in __subscriptions:
if not isinstance(news, sub.atype):
continue
if not issubclass(sub.atype, bound):
continue
yield sub

def announce(news):
replies = []
for sub in _receivers(news):
try:
reply = sub.callback(news)
replies.append(reply)
except Exception as e:
replies.append(e)
return replies

if __name__ == '__main__':
i = 0
class A(object):
def __init__(self):
global i
self.msg = type(self).__name__ + str(i)
i += 1

class B(A): pass
class C(B): pass

assert _is_type(A)
assert _is_type(B)
assert _is_type(C)

assert issubclass(B, A)
assert issubclass(C, B)

def makeHandler(atype):
def handler(s):
assert isinstance(s, atype)
return 'handler' + atype.__name__ + ' got ' + s.msg
return handler

handleA = makeHandler(A)
handleB = makeHandler(B)
handleC = makeHandler(C)

def failer(s):
raise Exception, 'failed on' + s.msg

assert callable(handleA) and callable(handleB) and callable(handleC)

subscribe(handleA, A)
subscribe(handleB, B)
subscribe(handleC, C)
subscribe(failer, A)

assert _boundary(A) is object
assert _boundary(B) is object
assert _boundary(C) is object

print announce(A())
print announce(B())
print announce(C())

print
open_forum(B)

assert _boundary(A) is object
assert _boundary(B) is B
assert _boundary(C) is B
assert issubclass(B, B)

print announce(A())
print announce(B())
print announce(C())

print
close_forum(B)
print announce(A())
print announce(B())
print announce(C())

这些是我搜索的原因:

  1. 我已经检查了数千行我必须维护的 Delphi 代码。他们使用 Observer 模式进行 MVC 解耦,但一切仍然非常耦合,因为观察者和主题之间的依赖关系过于明确。
  2. 我一直在学习 PyQt4,如果我必须在 Qt4Designer 中点击-点击-点击每一个我想要到达有意义的目的地的事件,那我会死的。
  3. 然而,在另一个个人数据应用程序上,我需要抽象事件传递和处理,因为持久性和 UI 会因平台而异,并且必须完全独立。

引用资料

自己和别人找到的应该放这里

  • PybubSub将字符串用于 topycs 和方法签名(第一个信号定义签名)。
  • An articleFinalBuilder的博客中报告说他们已经成功地使用了一个具有整数结构的系统作为有效负载、消息和用于过滤的整数掩码。
  • PyDispatcher文档最少。
  • D-Bus已被 Gnome 和 KDE 项目等采用。 Python binding可用。

最佳答案

你应该试试 Enterprise Integration Patterns尽管它以进程间消息传递为中心,但它为发布-订阅提供了非常详细的处理。

关于python - 有用的发布-订阅语义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4720763/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com