gpt4 book ai didi

python - 如何开发 Avahi 客户端/服务器

转载 作者:IT老高 更新时间:2023-10-28 21:14:26 39 4
gpt4 key购买 nike

我正在尝试使用 python 开发客户端/服务器解决方案,服务器必须使用 Avahi 广播服务可用性。我正在使用以下代码发布服务:

import avahi
import dbus

__all__ = ["ZeroconfService"]

class ZeroconfService:
"""A simple class to publish a network service with zeroconf using
avahi.

"""

def __init__(self, name, port, stype="_http._tcp",
domain="", host="", text=""):
self.name = name
self.stype = stype
self.domain = domain
self.host = host
self.port = port
self.text = text

def publish(self):
bus = dbus.SystemBus()
server = dbus.Interface(
bus.get_object(
avahi.DBUS_NAME,
avahi.DBUS_PATH_SERVER),
avahi.DBUS_INTERFACE_SERVER)

g = dbus.Interface(
bus.get_object(avahi.DBUS_NAME,
server.EntryGroupNew()),
avahi.DBUS_INTERFACE_ENTRY_GROUP)

g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC,dbus.UInt32(0),
self.name, self.stype, self.domain, self.host,
dbus.UInt16(self.port), self.text)

g.Commit()
self.group = g

def unpublish(self):
self.group.Reset()


def test():
service = ZeroconfService(name="TestService", port=3000)
service.publish()
raw_input("Press any key to unpublish the service ")
service.unpublish()


if __name__ == "__main__":
test()

至于客户,我正在尝试使用以下方式搜索服务:

# http://avahi.org/wiki/PythonBrowseExample
import dbus, gobject, avahi
from dbus import DBusException
from dbus.mainloop.glib import DBusGMainLoop

# Looks for iTunes shares

TYPE = "_http._tcp"

def service_resolved(*args):
print 'service resolved'
print 'name:', args[2]
print 'address:', args[7]
print 'port:', args[8]

def print_error(*args):
print 'error_handler'
print args[0]

def myhandler(interface, protocol, name, stype, domain, flags):
print "Found service '%s' type '%s' domain '%s' " % (name, stype, domain)

if flags & avahi.LOOKUP_RESULT_LOCAL:
# local service, skip
pass

server.ResolveService(interface, protocol, name, stype,
domain, avahi.PROTO_UNSPEC, dbus.UInt32(0),
reply_handler=service_resolved, error_handler=print_error)

loop = DBusGMainLoop()

bus = dbus.SystemBus(mainloop=loop)

server = dbus.Interface( bus.get_object(avahi.DBUS_NAME, '/'),
'org.freedesktop.Avahi.Server')

sbrowser = dbus.Interface(bus.get_object(avahi.DBUS_NAME,
server.ServiceBrowserNew(avahi.IF_UNSPEC,
avahi.PROTO_UNSPEC, TYPE, 'local', dbus.UInt32(0))),
avahi.DBUS_INTERFACE_SERVICE_BROWSER)

sbrowser.connect_to_signal("ItemNew", myhandler)

gobject.MainLoop().run()

但是客户端没有检测到服务何时启动。关于我做错了什么的任何想法?

最佳答案

我发现代码按预期工作。我的防火墙规则阻止了与 avahi 相关的发布。

关于python - 如何开发 Avahi 客户端/服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3430245/

39 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com