gpt4 book ai didi

python - 带有 asyncio 的 UDP 服务器

转载 作者:行者123 更新时间:2023-12-03 11:50:12 29 4
gpt4 key购买 nike

我正在尝试在 python asyncio 循环中接收 UDP 数据包。我是 asyncio 的新手,所以我可能做错了什么,因为回调永远不会被调用:

import asyncio

class DiscoveryProtocol(asyncio.DatagramProtocol):
def __init__(self):
super().__init__()
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
print(data)

def start_discovery():
loop = asyncio.get_event_loop()
t = loop.create_datagram_endpoint(DiscoveryProtocol,local_addr=('',5006))
loop.run_until_complete(t)
loop.run_forever()

我可以使用普通的旧套接字(没有 asyncio)接收数据包。

我究竟做错了什么?

最佳答案

没有公认的答案,所以这似乎已经萎缩,但它出现在搜索中。如果有人来到这里并想要一个最终的解决方案,下面的代码片段说明了一个功能齐全的 UDP 服务器。 write_messages()函数只是一种测试方法。它读取包含您想要的任何内容的日志文件,并将每一行作为 Syslog 消息发布到 UDP 端口 514。将此作为脚本运行说明服务器监听和打印它从 syslog 中排出的任何内容。更新 SyslogProtocol无论您有什么格式/处理需求。

import socket
import asyncio
import os, random

HOST, PORT = 'localhost', 514


def send_test_message(message: 'Message to send to UDP port 514') -> None:
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.sendto(message.encode(), (HOST, PORT))


async def write_messages() -> "Continuously write messages to UDP port 514":
dir_path = os.path.dirname(os.path.realpath(__file__))
fp = open(os.path.join(dir_path, "tests/example.log"))
print("writing")
for line in fp.readlines():
await asyncio.sleep(random.uniform(0.1, 3.0))
send_test_message(line)


class SyslogProtocol(asyncio.DatagramProtocol):
def __init__(self):
super().__init__()

def connection_made(self, transport) -> "Used by asyncio":
self.transport = transport

def datagram_received(self, data, addr) -> "Main entrypoint for processing message":
# Here is where you would push message to whatever methods/classes you want.
print(f"Received Syslog message: {data}")


if __name__ == '__main__':
loop = asyncio.get_event_loop()
t = loop.create_datagram_endpoint(SyslogProtocol, local_addr=('0.0.0.0', PORT))
loop.run_until_complete(t) # Server starts listening
loop.run_until_complete(write_messages()) # Start writing messages (or running tests)
loop.run_forever()

关于python - 带有 asyncio 的 UDP 服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46932654/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com