- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在尝试编写一个基于 Asyncio 和使用 ZeroMQ 实现的发布/订阅设计模式的简单程序。发布者有 2 个协程;一个监听传入的订阅,另一个将值(通过 HTTP 请求获得)发布给订阅者。订阅者订阅特定参数(在本例中为城市名称),并等待值(该城市的温度)。
这是我的代码:
publisher.py
#!/usr/bin/env python
import json
import aiohttp
import aiozmq
import asyncio
import zmq
class Publisher:
BIND_ADDRESS = 'tcp://*:10000'
def __init__(self):
self.stream = None
self.parameter = ""
@asyncio.coroutine
def main(self):
self.stream = yield from aiozmq.create_zmq_stream(zmq.XPUB, bind=Publisher.BIND_ADDRESS)
tasks = [
asyncio.async(self.subscriptions()),
asyncio.async(self.publish())]
print("before wait")
yield from asyncio.wait(tasks)
print("after wait")
@asyncio.coroutine
def subscriptions(self):
print("Entered subscriptions coroutine")
while True:
print("New iteration of subscriptions loop")
received = yield from self.stream.read()
first_byte = received[0][0]
self.parameter = received[0][-len(received[0])+1:].decode("utf-8")
# Subscribe request
if first_byte == 1:
print("subscription request received for parameter "+self.parameter)
# Unsubscribe request
elif first_byte == 0:
print("Unsubscription request received for parameter "+self.parameter)
@asyncio.coroutine
def publish(self):
print("Entered publish coroutine")
while True:
if self.parameter:
print("New iteration of publish loop")
# Make HTTP request
url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter
response = yield from aiohttp.request('GET', url)
assert response.status == 200
content = yield from response.read()
# Decode JSON string
decoded_json = json.loads(content.decode())
# Get parameter value
value = decoded_json["main"]["temp"]
# Publish fetched values to subscribers
message = bytearray(self.parameter+":"+str(value),"utf-8")
print(message)
pack = [message]
print("before write")
yield from self.stream.write(pack)
print("after write")
yield from asyncio.sleep(10)
test = Publisher()
loop = asyncio.get_event_loop()
loop.run_until_complete(test.main())
subscriber.py
#!/usr/bin/env python
import zmq
class Subscriber:
XSUB_CONNECT = 'tcp://localhost:10000'
def __init__(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.XSUB)
self.socket.connect(Subscriber.XSUB_CONNECT)
def loop(self):
print(self.socket.recv())
self.socket.close()
def subscribe(self, parameter):
self.socket.send_string('\x01'+parameter)
print("Subscribed to parameter "+parameter)
def unsubscribe(self, parameter):
self.socket.send_string('\x00'+parameter)
print("Unsubscribed to parameter "+parameter)
test = Subscriber()
test.subscribe("London")
while True:
print(test.socket.recv())
这是输出:
订阅方:
$ python3 subscriber.py
Subscribed to parameter London
b'London:288.15'
发布方:
$ python3 publisher.py
before wait
Entered subscriptions coroutine
New iteration of subscriptions loop
Entered publish coroutine
subscription request received for parameter London
New iteration of subscriptions loop
New iteration of publish loop
bytearray(b'London:288.15')
before write
程序卡在那里。
如您所见,"before write"
出现在输出中并且消息被发送,但是 "after write"
没有出现。因此,我认为可能在 self.stream.write(pack)
调用堆栈中的某处引发并捕获了异常。
如果我向发布者发送一个KeyboardInterrupt
,我得到的是:
Traceback (most recent call last):
File "publisher.py", line 73, in <module>
loop.run_until_complete(test.main())
File "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete
self.run_forever()
File "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever
self._run_once()
File "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.4/selectors.py", line 432, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished coro=<publish() done, defined at publisher.py:43> exception=TypeError("'NoneType' object is not iterable",)>
Traceback (most recent call last):
File "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "publisher.py", line 66, in publish
yield from self.stream.write(pack)
TypeError: 'NoneType' object is not iterable
Task was destroyed but it is pending!
task: <Task pending coro=<subscriptions() running at publisher.py:32> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
所以我想我的问题实际上是这个错误:TypeError: 'NoneType' object is not iterable
,但我不知道是什么原因造成的。
这里出了什么问题?
最佳答案
问题是您试图yield from
调用self.stream.write()
,但是stream.write
isn't actually a coroutine .当您对项目调用 yield from
时,Python 会在内部调用 iter(item)
。在这种情况下,对 write()
的调用返回 None
,因此 Python 正在尝试执行 iter(None)
- 因此出现异常看。
要修复它,您应该像普通函数一样调用write()
。如果你真的想等到 write
被刷新并发送给读者,使用 yield from stream.drain()
在调用 write()
之后:
print("before write")
self.stream.write(pack)
yield from self.stream.drain()
print("after write")
此外,要确保在不需要 Ctrl+C 的情况下在 publish
中引发异常,请使用 asyncio.gather
而不是 asyncio.wait
:
yield from asyncio.gather(*tasks)
使用 asyncio.gather
,tasks
中的任务抛出的任何异常都将被重新引发。
关于Python Asyncio 阻塞协程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31050056/
对于一个简单的聊天程序,我使用了一个通过 boost::python 包装的 c 库。 使用 PyQT 编写了一个简单的 GUI。接收消息是通过阻塞调用完成的lib说。对于独立刷新的 GUI,通信部分
当我创建以下内容时,我试图创建一个可以被异常终止的线程类(因为我试图让线程等待一个事件): import sys class testThread(threading.Thread): def
我正在用 Haskell 编写服务器,我想在客户端断开连接后显式关闭它们。当我调用 hClose ,线程将阻塞,直到客户端关闭其一侧的句柄。有没有办法让它在不阻塞的情况下关闭? 提前致谢! 最佳答案
这个问题已经有答案了: 已关闭12 年前。 Possible Duplicate: garbage collection Operation 我有几个相关问题。 1.JAVA垃圾收集器运行时,是否占用
我有一个 Angular 函数,它在初始 URL 中查找“列表”参数,如果找到,就会出去获取信息。否则我想获得地理位置。如果存在 URL 参数,我不想获取地理位置。我使用的术语是否正确? constr
我读了很多关于锁定数据库、表和行的文章,但我想要较低的锁定,比如只锁定“操作”,我不知道如何调用它,假设我在 php 中有函数: function update_table() { //que
在我的多线程 mfc 应用程序中,m_view->SetScrollPos 处于阻塞状态并且所有应用程序都被卡住。 View 是在另一个线程中创建的,这是这种行为的原因吗? //SetScrollPo
FreeSwitch 软件在几天内运行良好(~3 - 5 天),然后由于 FreeSwitch 被阻止,新的来电请求被接受!!正在进行的调用继续他们的 session ,他们的调用似乎没有受到影响,但
我有一组按钮,当鼠标悬停在这些按钮上时,它们会改变颜色。这些的 CSS 以这种方式运行: #navsite ul li button { height: 60px; width: 60
由于某些原因,当我调用 WSARecvFrom 时,该函数在接收到某些内容之前不会返回。 _socket = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, N
我了解一些关于 Oracle 阻塞的知识——更新如何阻塞其他更新直到事务完成,写入者如何不阻塞读取者等。 我理解悲观和乐观锁定的概念,以及有关丢失更新等典型银行教科书示例。 我也理解 JDBC 事务隔
在两个代码点之间,我是否可以判断进程是否已被内核抢占,或者更确切地说,当时是否有任何其他代码在同一处理器上运行? //Point A some_type capture = some_capture(
这是我在 Oracle 的面试问题。 有一个堆栈,即使堆栈已满,push 操作也应该等到它完成,即使堆栈为空,pop 操作也应该等到它完成。 我们怎样才能做到这一点? 我的回答 让一个线程做push
我想知道是否有人可以告诉我如何有效地使用循环平铺/循环阻塞进行大型密集矩阵乘法。我正在用 1000x1000 矩阵做C = AB。我按照 Wikipedia 上的循环平铺示例进行操作,但使用平铺得到的
我正在阅读有关绿色线程的内容,并且能够理解这些线程是由 VM 或在运行时创建的,而不是由操作系统创建的,但我无法理解以下语句 When a green thread executes a blocki
我正在创建的 JavaScript API 具有以下结构: var engine = new Engine({ engineName: "TestEngine", engineHost
ChildWindow 是一个模态窗口,但它不会阻塞。有没有办法让它阻塞?我基本上想要一个 ShowDialog() 方法,该方法将调用 ChildWindow.Show() 但在用户关闭 Child
我需要一些关于如何调试 10.6 版本下的 Cocoa 并发问题的指导。我正在将“for”循环转换为使用 NSOperations,但大多数时候,代码只是在循环的某个时刻卡住。我可以在控制台中看到 N
我正在使用 ReportViewer 控件和自定义打印作业工作流程,这给我带来了一些问题。我的代码看起来有点像这样: ids.ForEach(delegate(Guid? guid)
我有以下成功复制文件的代码。但是,它有两个问题: progressBar.setValue() 之后的 System.out.println() 不会打印 0 到 100 之间的间隔(仅打印“0”直到
我是一名优秀的程序员,十分优秀!