gpt4 book ai didi

python - 扭曲等待事件循环

转载 作者:行者123 更新时间:2023-11-30 23:16:39 24 4
gpt4 key购买 nike

我想读取和处理来自外部服务的一些数据。我询问服务是否有任何数据,是否返回了某些数据,然后对其进行处理并再次询问(以便在数据可用时可以立即对其进行处理),否则我将等待数据可用的通知。这可以写成无限循环:

def loop(self):
while True:
data = yield self.get_data_nonblocking()
if data is not None:
yield self.process_data(data)
else:
yield self.data_available

def on_data_available(self):
self.data_available.fire()


data_available如何在此处实现?它可以是延迟的,但不能重新设置延迟,只能重新创建。有更好的选择吗?

可以将此循环集成到Twisted事件循环中吗?我可以直接在 on_data_available中读取和处理数据,并编写一些代码,而不用循环检查 get_data_nonblocking,但是我觉得那么我需要一些锁以确保以到达时的顺序处理数据(上述代码强制执行它,因为它是唯一要处理的地方)。这是个好主意吗?

最佳答案

考虑TCP连接的情况。 TCP连接的接收器缓冲区中可能有数据,也可能没有数据。您可以通过使用非阻塞套接字API来获得该数据,或一无所获,而不会阻塞:

data = socket.recv(1024)
if data:
self.process_data(data)


您可以使用 select()(或任何基本等效的API)等待数据可用:

socket.setblocking(False)
while True:
data = socket.recv(1024)
if data:
self.process_data(data)
else:
select([socket], [], [])


其中,只有 select()特别是Twisted不友好的(尽管Twisted惯用语当然不是要进行自己的 socket.recv调用)。您可以用Twisted友好版本替换 select调用(用触发 ProtocoldataReceived方法实现 Deferred-类似于 on_data_available方法-折腾一些并使其整个事物是 inlineCallbacks生成器)。

但这虽然是从TCP连接获取数据的一种方式,但这不是Twisted鼓励您使用的API。相反,API是:

class SomeProtocol(Protocol):
def dataReceived(self, data):
# Your logic here


我看不出您的情况有何不同。如果不是执行您编写的循环,而是执行了以下操作,该怎么办:

class YourDataProcessor(object):
def process_data(self, data):
# Your logic here

class SomeDataGetter(object):
def __init__(self, processor):
self.processor = processor

def on_available_data(self):
data = self.get_data_nonblocking()
if data is not None:
self.processor.process_data(data)


现在根本没有Deferred了(也许无论用什么实现 on_available_dataget_data_nonblocking,但我看不到该代码)。

如果您大致保持原样,则可以确保按顺序执行,因为Twisted是单线程的(在几个位置上都有非常清晰的标记除外),并且在单线程程序中,可以更早地调用 process_data必须先完成,然后再对 process_data进行任何调用(当然, process_data重新进入自身的情况除外-但这是另外一回事了)。

如果将其切换回使用 inlineCallbacks(或任何等效的“协程”调味饮料混合物),则可能会引入乱序执行的可能性。

例如,如果 get_data_nonblocking返回一个 Deferred并且您编写如下内容:

    @inlineCallbacks
def on_available_data(self):
data = yield self.get_data_nonblocking()
if data is not None:
self.processor.process_data(data)


然后,您已经更改了 on_available_data以说在调用 get_data_nonblocking时允许上下文切换。在这种情况下,根据您对 get_data_nonblockingon_available_data的实现,完全有可能:


on_available_data被称为
get_data_nonblocking被调用并返回 Deferred
on_available_data告诉执行切换到另一个上下文(通过 yield / inlineCallbacks
再次调用 on_available_data
再次调用 get_data_nonblocking并返回一个 Deferred(也许是相同的!也许是一个新的!取决于它的实现方式)
第二次 on_available_data调用告诉执行切换到另一个上下文(相同原因)
反应器旋转一会儿,最终发生一个事件,该事件导致由 Deferred的第二次调用返回的 get_data_nonblocking触发。
执行切换回第二个 on_available_data
用第二次 process_data调用返回的任何数据调用 get_data_nonblocking
最终,第一组对象发生了相同的事情,并且使用第一个 process_data调用返回的任何数据再次调用 get_data_nonblocking


现在,也许您已按顺序处理了数据-同样,这取决于系统其他部分的更多详细信息。

如果是这样,您可以随时重新下订单。有许多不同的可能方法。 Twisted本身不附带任何明确支持此操作的API,因此该解决方案涉及编写一些新代码。这是一种方法的想法(未经试验)-类似于队列的类,它了解对象序列号:

class SequencedQueue(object):
"""
A queue-like type which guarantees objects come out of the queue in the order
defined by a sequence number associated with the objects when they are put into
the queue.

Application code manages sequence number assignment so that sequence numbers don't
have to have the same order as `put` calls on this type.
"""
def __init__(self):
# The sequence number of the object that should be given out
# by the next call to `get`
self._next_sequence = 0

# The sequence number of the next result that needs to be provided.
self._next_result = 0

# A holding area for objects past _next_sequence
self._queue = {}

# A holding area
self._waiting =

def put(self, sequence, object):
"""
Put an object into the queue at a particular point in the sequence.
"""
if sequence < self._next_sequence:
# Programming error. The sequence number
# of the object being put has already been used.
raise ...

self._queue[sequence] = object
self._check_waiters()

def get(self):
"""
Get an object from the queue which has the next sequence number
following whatever was previously gotten.
"""
result = self._waiters[self._next_sequence] = Deferred()
self._next_sequence += 1
self._check_waiters()
return result

def _check_waiters(self):
"""
Find any Deferreds previously given out by get calls which can now be given
their results and give them to them.
"""
while True:
seq = self._next_result
if seq in self._queue and seq in self._waiting:
self._next_result += 1
# XXX Probably a re-entrancy bug here. If a callback calls back in to
# put then this loop might run recursively
self._waiting.pop(seq).callback(self._queue.pop(seq))
else:
break


预期的行为(我意外添加的任何错误的模数)类似于:

q = SequencedQueue()
d1 = q.get()
d2 = q.get()
# Nothing in particular happens
q.put(1, "second result")
# d1 fires with "first result" and afterwards d2 fires with "second result"
q.put(0, "first result")


使用此方法,只需确保按要分发数据的顺序分配序列号,而不是实际在某处显示的顺序即可。例如:

    @inlineCallbacks
def on_available_data(self):
sequence = self._process_order
data = yield self.get_data_nonblocking()
if data is not None:
self._process_order += 1
self.sequenced_queue.put(sequence, data)


在其他地方,某些代码可以消耗队列,例如:

@inlineCallbacks
def queue_consumer(self):
while True:
yield self.process_data(yield self.sequenced_queue.get())

关于python - 扭曲等待事件循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27646605/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com