- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试编写一个小程序来发送和接收 UDP 流量并通过 HTTP 接口(interface)接收命令。 HTTP 服务器位于一个 multiprocessing.Process
中; UDP 服务器位于另一个服务器中。这两个进程通过 python multiprocessing.Pipe
进行通信。我在下面附上了完整的代码。
我有 2 个相关问题:
我希望 UDP 服务器执行的操作的伪代码:
kq = new kqueue
udpEvent = kevent when socket read
pipeEvent = kevent when pipe read
while:
for event in kq.conrol([udpEvent, pipeEvent]):
if event == udpEvent:
# do something
elif event == pipeEvent:
print "HTTP command via pipe:", pipe.recv()
现在,UDP 服务器可以识别套接字事件并正确读取套接字。但是,当我将管道 kevent 添加到 kqueue 时,程序会不间断地吐出管道事件。我将过滤器设置为管道已写入,但我假设 1) 这是错误的 2) 更具体地说, python multiprocessing.Pipe
就像常规的 unix 管道,需要以不同的方式处理.
.....
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 ^C<select.kevent ident=4297866384 filter=-29216 flags=0x4000 fflags=0x1 data=0x16 udata=0x4000000000000>
<小时/>
main.py
import sys
from multiprocessing import Process, Pipe
# from userinterface import OSXstatusbaritem # use like so: OSXstatusbaritem.start(pipe)
from server import Server
import handler # UI thingy
# For UI, use simple HTTP server with various endpoints
# open a connection: localhost:[PORT]/open/[TARGET_IP]
def startServer(pipe):
UDP_IP = "127.0.0.1"
UDP_PORT = 9000
print "starting server"
s = Server(pipe)
s.listen(UDP_IP, UDP_PORT)
print "finishing server"
import BaseHTTPServer
def startUI(pipe):
HTTP_PORT = 4567
server_class = BaseHTTPServer.HTTPServer
myHandler = handler.handleRequestsUsing(pipe)
httpd = server_class(('localhost', 4567), myHandler)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
def main():
# Named full duplex pipe for communicating between server process and UI
pipeUI, pipeServer = Pipe()
# Start subprocesses
pServer = Process(target=startServer, args=(pipeServer,))
pServer.start()
startUI(pipeUI)
pServer.join()
if __name__ == "__main__": sys.exit(main())
server.py (UDP)
import sys
import select # for kqueue
from socket import socket, AF_INET, SOCK_DGRAM
from multiprocessing import Process, Pipe
class Server:
def __init__(self, pipe):
self.pipe = pipe
def listen (self, ipaddress, port):
print "starting!"
# Initialize listening UDP socket
sock = socket(AF_INET, SOCK_DGRAM)
sock.bind((ipaddress, port))
# Configure kqueue
kq = select.kqueue()
# Event for UDP socket data available
kevent0 = select.kevent( sock.fileno(),
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR)
# Event for message queue from other processes (ui)
kevent1 = select.kevent( self.pipe.fileno(),
filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE)
# TODO: Figure out how to handle multiple kevents on kqueue
# TODO: Need an event for TUN data
# Start kqueue
while True:
revents = kq.control([kevent0, kevent1], 1, None)
for event in revents:
print event
kq.close()
# close file descriptors (os.close(fd))
handler.py(HTTP接口(interface))
import BaseHTTPServer
# Simple HTTP endpoints for controlling prototype Phantom implementation.
# The following commands are supported:
# 1. Open a connection via /open/[IP]:[PORT]
# 2. ????
class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
pipe = None
def __init__(self, pipe, *args):
RequestHandler.pipe = pipe
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args)
def do_HEAD(s):
s.send_response(200)
s.send_header("Content-type", "application/json")
s.end_headers()
def do_GET(s):
s.send_response(200)
s.send_header("Content-type", "application/json")
s.end_headers()
# Open connection command
if s.path.startswith('/open/'):
addrStr = s.path[6:len(s.path)]
(address, port) = tuple(filter(None, addrStr.split(':')))
port = int(port)
print "opening address: ", address, "port:", port
RequestHandler.pipe.send(['open', address, port])
def handleRequestsUsing(logic):
return lambda *args: RequestHandler(logic, *args)
<小时/>
更新:
我用select重写了服务器监听方法。对于一个缓慢的小型 python 原型(prototype),不会使用超过 3 或 4 个 fd,速度并不重要。 Kqueue 将是另一天的主题。
def 监听(自身、IP 地址、端口): 打印“开始!”
# Initialize listening non-blocking UDP socket
sock = socket(AF_INET, SOCK_DGRAM)
sock.setblocking(0)
sock.bind((ipaddress, port))
inputs = [sock, self.pipe] # stuff we read
outputs = [] # stuff we expect to write
while inputs:
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for event in readable:
if event is sock:
self.handleUDPData( sock.recvfrom(1024) )
if event is self.pipe:
print "pipe event", self.pipe.recv()
最佳答案
我知道这是一个老问题,但我可以给你一个我用于多线程 HTTP 服务器的 kqueue 套接字轮询的示例,这是我在阅读 C 源代码和 kqueue 手册页后发现的。
#bsd socket polling
#I make all the relevant flags more C like to match the kqueue man pages
from select import kevent, kqueue
from select import KQ_EV_ADD as EV_ADD, KQ_EV_ONESHOT as EV_ONESHOT
from select import KQ_EV_EOF as EV_EOF
from .common import Client_Thread #a parent class who's implementation is irrelevant to the question, lol
class BSD_Client(Client_Thread):
def __init__(self, *args):
Client_Thread.__init__(self, *args)
#Make a kqueue object for the thread
kq = kqueue()
#Make a one-shot kev for this kqueue for when the kill socket is
#connected to. The connection is only made once, so why not tell
#that to our kqueue? The default filter is EVFILT_READ, so we don't
#need to specify that. The default flag is just EV_ADD.
kill_kev = kevent(self.kill_fd, flags=EV_ADD|EV_ONESHOT)
#using defaults for the client socket.
client_kev = kevent(self.client_sock)
#we only need to keep track of the kqueue's control func.
#This also makes things prettier in the run func.
self.control = kq.control
#now, we add thel list of events we just made to our kqueue.
#The first 0 means we want a list of at most 0 length in return.
#the second 0 means we want no timeout (i.e. do this in a
#non-blocking way.)
self.control([client_kev, kill_kev], 0, 0)
def run(self):
while True:
#Here we poll the kqueue object.
#The empty list means we are adding no new events to the kqueue.
#The one means we want a list of at most 1 element. Then None
#Means we want block until an event is triggered.
events = self.control([], 1, None)
#If we have an event, and the event is for the kill socket
#(meaning somebody made a connection to it), then we break the
#loop and die.
if events and events[0].ident == self.kill_fd:
self.die()
break
#If all that is left is an EOF in our socket, then we break
#the loop and die. Kqueues will keep returning a kevent
#that has been read once, even when they are empty.
if events and events[0].flags & EV_EOF:
self.die()
break
#Finally, if we have an event that isn't for the kill socket and
#does not have the EOF flag set, then there is work to do. If
#the handle client function (defined in the parent class) returns
#1, then we are done serving a page and we can die.
if events and self.handle_client():
self.die()
break
client.close()
self.die 所做的就是将客户端 ip:port 字符串放入
到用于消息传递的队列中。另一个线程从队列中获取
该字符串,打印一条消息并加入
相关的线程对象。当然,我没有使用管道,只使用套接字。我确实在 kqueue 的在线手册页上找到了这个
Fifos, Pipes
Returns when the there is data to read; data contains the number of
bytes available.
When the last writer disconnects, the filter will set EV_EOF in
flags. This may be cleared by passing in EV_CLEAR, at which point the
filter will resume waiting for data to become available before re-
turning
那么也许在您的 udp 服务器中,您循环浏览 revents 列表,您应该按照手册页所述进行操作?实际上,您甚至不需要循环遍历最长为 1 的列表。也许你的监听函数应该是这样的......
def listen(self, ip, port):
print "Starting!"
sock = socket.socket(AF_INET, SOCK_DGRAM)
sock.bind((ip, port))
kq = select.kqueue()
kev0 = select.kevent(sock)
kev1 = select.kevent(self.pipe)
kq.control([kev0, kev1], 0, 0)
while True: #this loop never breaks! so this whole function blocks forever like this
revents = kq.control([], 1, None)
if revents:
event = revents[0]
if event.flags & select.KQ_EV_EOF:
new_event = select.kevent(event.ident, flags=select.KQ_EV_CLEAR)
kq.control([new_event], 0, 0)
else:
print event
我真的建议按照我的方式导入标志和函数,这使它与您必须比较的基于 C 的联机帮助页更加相似,而且我认为它看起来更漂亮。我还想指出,我的类与您所拥有的有点不同,因为每个新客户端都将获得该类的一个实例,并且每个类都将在自己的线程中运行。
关于具有多个事件的Python kqueue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17137437/
使用 kqueue 注册事件时与该事件相关的 ID 提供类型;例如,文件描述符用于标识文件以 watch int kq; struct kevent ke; kq = kqueue(); fd = o
我正在尝试编写一个小程序来发送和接收 UDP 流量并通过 HTTP 接口(interface)接收命令。 HTTP 服务器位于一个 multiprocessing.Process 中; UDP 服务器
我正在尝试使用 kqueue 观察目录大小的变化,这可能吗?这样做的原因是因为我正在监视目录,每当事件触发时,我都会统计目录并比较上次修改时间等,以确定是否发生了内容修改、添加、删除或重命名事件。我的
我使用 kquque 来监控桌面: 标志 - EV_ADD | EV_CLEAR fflags - 注意删除 |注_写 |注意_扩展 |注_属性 |注意_LINK |注意_RENAME | NOTE_
kqueue(在 OS X 上)对读/写常规文件有用吗?我知道 epoll 不对 Linux 上的常规文件有用,所以我想知道 kqueue 是否也是如此。 编辑:我不是说读/写文件,显然 read()
好的:我正在 iPhone OS 应用程序中实现文件共享,当然这意味着文件系统监控。耶! 基本上,当用户将文件操作到 iTunes 中我的应用程序部分时,操作系统会在我可以访问的目录中进行复制和/或删
我正在 mac os x 上使用 Kqueues,并尝试监视文件夹 所以我使用了 EVFILT_VNODE 过滤器,并且我想在删除文件时收到通知,我尝试了 NOTE_DELETE 但它仅检测文件何时通
在 MacOS 中,我使用 O_NONBLOCK 设置了一个读+写套接字来连接到远程服务器。我使用 kqueue 来等待和协调 I/O 事件。对 connect() 的调用立即触发 EINPROGRE
在 mac 上我使用 kqueue,它表明 udata 未更改。但是,kevent 调用的 event_data 返回的数组正在进行一些修改。什么可能导致这种情况?我将指针传递给转换为 void* 的
我在做什么 我正在实现一个基于 python/kqueue (FreeBSD) 的解决方案来跟踪对特定日志文件的更改,当满足 KQ_NOTE_WRITE fflag 时,对文件的更改将由我的 pyth
kqueue mechanism有一个事件标志,EV_RECEIPT ,根据链接的手册页: ... is useful for making bulk changes to a kqueue with
我编写了并发应用程序并发现了错误: buildFdSets: file descriptor out of range 我发现这是一个进程中文件描述符数量的操作系统限制,在我的 FreeBSD 中是
我很难理解如何将 kqueue 用于用户空间事件。 我寻找 2 个用例。 用例 1:手动重置事件 用例 2:自动重置事件 我想我了解如何使用 kqueue() 和 kevent(),但我不清楚传递给
我正在学习有关 kqueue 的教程(特别是 http://eradman.com/posts/kqueue-tcp.html 和 https://wiki.netbsd.org/tutorials/
我知道,如果远程主机优雅地关闭连接,epoll将报告EPOLLIN,并且调用read或recv不会阻塞,并且将返回0字节(即流结束)。 但是,如果连接未正常关闭,并且 write 或 send 操作失
我的开发机器是 MacBook(当然有 kqueue)。然而,在生产环境中,我们运行的是 Linux(当然使用 epoll)。显然,要了解我的代码的性能特征,我需要使用 epoll 运行它。也就是说,
如果一个线程(比如 X)正在等待 epoll_wait(),另一个线程(比如 Y)是否可以调用 epoll_ctl() 来注册对文件的兴趣描述符 9。之前在线程X中调用epoll_wait()能否返回
我正在努力在 epoll 和 kqueue 标志之间画一条平行线,特别是 EPOLLONESHOT EPOLLET EPOLLEXCLUSIVE 和 EV_CLEAR/EV_DISPATCH/EV_O
Python Epoll 有一个名为 epoll.unregister 的函数,它从 epoll 对象中删除已注册的文件描述符。有谁知道Kqueue中与此类似的功能是什么。对于 kqueue 我只能找
我试图了解 kqueue 中 EV_DISABLE 和 EV_ENABLE 的用例。 int KQueue = kqueue(); struct kevent ev = { .ident = fd
我是一名优秀的程序员,十分优秀!