我正在使用 pydbus我已经成功地使用它来监听 session 总线上的信号(在“客户端”)。我想现在编写服务器端,程序在每次触发操作时发送信号(例如,当它在 FS 上写入文件时)。我真的没有在他们的 GitHub 上得到任何这样的例子。他们只展示了如何编写一个基本的服务器,该服务器有一堆客户端可以调用的方法(但它与我想要的信号无关)。
仅供引用,客户端非常简单,看起来像这样:
# Subscribe to DBus dump signal
session_bus = SessionBus()
session_bus.subscribe(iface='my.iface',
signal='my_signal_name',
object='/my/object',
signal_fired=my_callback)
# Create and run main loop
loop = GObject.MainLoop()
loop.run()
my_callback
是每次监听事件弹出时调用的方法(在本例中为 my_signal_name
)
感谢帮助。
以下 python3/pydbus 程序将每秒发布一个随机整数到 session D-Bus。
#!/usr/bin/env python3
#!
from pydbus.generic import signal
from pydbus import SessionBus
from gi.repository import GLib
import random
loop = GLib.MainLoop()
interface_name = "org.example.project_1.server_1"
bus = SessionBus()
class Server_XML(object):
"""
Server_XML definition.
Emit / Publish a signal that is a random integer every second
type='i' for integer.
"""
dbus = """
<node>
<interface name="org.example.project_1.server_1">
<signal name="app_1_signal">
<arg type='i'/>
</signal>
</interface>
</node>
"""
app_1_signal = signal()
def repeating_timer():
"""Generate random integer between 0 and 100 and emit over Session D-Bus
return True to keep the GLib timer calling this function once a second."""
random_integer = random.randint(0,100)
print(random_integer)
emit.app_1_signal(random_integer)
return True
if __name__=="__main__":
# Setup server to emit signals over the DBus
emit = Server_XML()
bus.publish(interface_name, emit)
# Setup repeating timer
GLib.timeout_add_seconds(interval=1, function=repeating_timer)
# Run loop with graceful ctrl C exiting.
try:
loop.run()
except KeyboardInterrupt as e:
loop.quit()
print("\nExit by Control C")
打开另一个 linux 控制台终端并使用 gdbus 实用程序验证整数是否正在 session 总线上发布。例如……
$ gdbus monitor --session --dest org.example.project_1.server_1
Monitoring signals from all objects owned by org.example.project_1.server_1
The name org.example.project_1.server_1 is owned by :1.618
/org/example/project_1/server_1: org.example.project_1.server_1.app_1_signal (36,)
/org/example/project_1/server_1: org.example.project_1.server_1.app_1_signal (37,)
/org/example/project_1/server_1: org.example.project_1.server_1.app_1_signal (25,)
以下 python3/pydbus 客户端程序订阅在 session D-Bus 上发布的随机整数...
#!/usr/bin/env python3
#!
from pydbus.generic import signal
from pydbus import SessionBus
from gi.repository import GLib
loop = GLib.MainLoop()
dbus_filter = "/org/example/project_1/server_1"
bus = SessionBus()
def cb_server_signal_emission(*args):
"""
Callback on emitting signal from server
"""
print("Message: ", args)
print("Data: ", str(args[4][0]))
if __name__=="__main__":
# Subscribe to bus to monitor for server signal emissions
bus.subscribe(object = dbus_filter,
signal_fired = cb_server_signal_emission)
loop.run()
此客户端程序的控制台输出将与此类似...
$ python3 client_pydbus.py
Message: (':1.621', '/org/example/project_1/server_1', 'org.example.project_1.server_1', 'app_1_signal', (72,))
Data: 72
Message: (':1.621', '/org/example/project_1/server_1', 'org.example.project_1.server_1', 'app_1_signal', (46,))
Data: 46
Message: (':1.621', '/org/example/project_1/server_1', 'org.example.project_1.server_1', 'app_1_signal', (47,))
Data: 47
我是一名优秀的程序员,十分优秀!