- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我第一次使用 python 中的 asyncio 并尝试将其与 ZMQ 结合起来。
基本上我的问题是我有一个 REP/REQ 系统,位于 async def
中具有我需要等待的功能。值如何不更新。下面是一段代码来说明这一点:
#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")
我将此对象发送到一个类并在此函数中将其取回
async def readsonar(self, trigger_pin, REP_server_django):
i= 0
while True:
ping_from_view = await REP_server_django.recv() # line.1
value = await self.board.sonar_read(trigger_pin) # line.2
print(value) # line.3
json_data = json.dumps(value) # line.4
#json_data = json.dumps(i) # line.4bis
REP_server_django.send(json_data.encode()) # line.5
i+=1 # line.6
await asyncio.sleep(1/1000) # line.7
sonar_read
,正在使用 pymata_express 读取超声波传感器。如果我评论line.2
和 line.4
我得到了 i
的正确值。如果我评论line.1
和 line.5
print(value)
打印 sonar_read
中的正确值。但是,当我按此处所示运行它时,value
没有更新。
我错过了什么吗?
<小时/>编辑:
编辑了有关行注释的类型。我的意思是,如果我只读取声纳并打印该值。效果很好。如果我只是.recv()
和.send(json.dumps(i).encode())
, 有用。但是如果我尝试发送声纳的值。它锁定给定的 value
未更新
编辑2:(对 Alan Yorinks 的回答):这是 MWE,它考虑您发送的有关 zmq
声明的内容在类里。它取自pymata_express
示例concurrent_tasks.py
要重现该错误,请在两个不同的终端中运行这两个脚本。您将需要一个带有 Frimata_express 的 arduino 板安装。如果一切顺利的话 PART A.
应该只在 mve_req.py
上吐出相同的值结尾。您可以编辑不同的 block (A、B 或 C 部分)来查看行为。
mve_rep.py
#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress
class ConcurrentTasks:
def __init__(self, board):
self.loop = board.get_event_loop()
self.board = board
self.ctxsync = zmq.Context()
self.context = zmq.asyncio.Context()
self.rep = self.context.socket(zmq.REP)
self.rep.bind("tcp://*:5558")
self.trigger_pin = 53
self.echo_pin = 51
loop.run_until_complete(self.async_init_and_run())
async def readsonar(self):
i = 0
while True:
#PART. A. WHAT I HOPE COULD WORK
rep_recv = await self.rep.recv() # line.1
value = await self.board.sonar_read(self.trigger_pin) # line.2
print(value) # line.3
json_data = json.dumps(value) # line.4
# json_data = json.dumps(i) # line.4bis
await self.rep.send(json_data.encode()) # line.5
i += 1 # line.6
await asyncio.sleep(1 / 1000) # line.7
'''
#PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
value = await self.board.sonar_read(self.trigger_pin) # line.2
print(value) # line.3
json_data = json.dumps(value) # line.4
i += 1 # line.6
await asyncio.sleep(1 / 1000) # line.7
'''
'''
#PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
rep_recv = await self.rep.recv() # line.1
json_data = json.dumps(i) # line.4bis
await self.rep.send(json_data.encode()) # line.5
i += 1 # line.6
await asyncio.sleep(1 / 1000) # line.7
'''
async def async_init_and_run(self):
await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)
readsonar = asyncio.create_task(self.readsonar())
await readsonar
# OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
my_board = PymataExpress()
try:
ConcurrentTasks(my_board)
except (KeyboardInterrupt, RuntimeError):
loop.run_until_complete(my_board.shutdown())
print('goodbye')
finally:
loop.close()
mve_req.py
import zmq
import time
import json
def start_zmq():
context = zmq.Context()
REQ_django = context.socket(zmq.REQ)
REQ_django.connect("tcp://localhost:5558")
return REQ_django, context
def get_sonar(REQ_django):
REQ_django.send(b"server_django")
ping_from_server_django = REQ_django.recv()
return ping_from_server_django.decode()
if __name__ == '__main__':
data = {"sensors":{}}
REQ_django, context = start_zmq()
while REQ_django:
data['sensors']['sonar'] = get_sonar(REQ_django)
json_data = json.dumps(data)
print(data)
#DO OTHER WORK
time.sleep(1)
REQ_django.close()
context.term()
最佳答案
完全公开,我是 pymata-express 的作者和 python-banyan. OP 要求我发布这个解决方案,所以这并不是一个无耻的插件。
自从 asyncio 在 Python 3 中首次引入以来,我一直在使用 asyncio 进行开发。当 asyncio 代码工作时,asyncio(恕我直言)可以简化并发性和代码。但是,当出现问题时,调试和理解问题原因可能会令人沮丧。
我提前道歉,因为这可能有点冗长,但我需要提供一些背景信息,以便该示例看起来不像一些随机的代码。
开发 python-banyan 框架是为了提供线程、多处理和异步的替代方案。简而言之,Banyan 应用程序由小型目标可执行文件组成,这些可执行文件使用通过 LAN 共享的协议(protocol)消息相互通信。它的核心使用 Zeromq。它的设计目的不是让流量通过 WAN 传输,而是使用 LAN 作为“软件背板”。在某些方面,Banyan 与 MQTT 类似,但在 LAN 内使用时速度要快得多。如果需要,它确实能够连接到 MQTT 网络。
Banyan的一部分是一个叫做OneGPIO的概念。它是一种协议(protocol)消息传递规范,将 GPIO 功能抽象为独立于任何硬件实现。为了实现硬件细节,开发了专门的 Banyan 组件,称为 Banyan 硬件网关。有适用于 Raspberry Pi、Arduino、ESP-8266 和 Adafruit Crickit Hat 的网关。 GPIO 应用程序发布任何或所有网关可以选择接收的通用 OneGPIO 消息。为了从一个硬件平台转移到另一个硬件平台,启动硬件关联网关,并且无需修改,启动控制组件(如下所示的代码)。要从一个硬件平台转到另一个硬件平台,任何组件都不需要修改代码,控制组件和网关都不需要修改。启动控制组件时,可以通过命令行选项指定诸如引脚号之类的变量。对于Arduino网关,pymata-express用于控制Arduino的GPIO。 Pymata-express 是 StandardFirmata 客户端的异步实现。需要注意的是,下面的代码不是 asyncio。 Banyan 框架允许使用适合问题的工具进行开发,但允许解耦解决方案的各个部分,在这种情况下,应用程序允许将异步与非异步混合,而不会遇到通常会遇到的任何麻烦所以。
在提供的代码中,类定义下面的所有代码都用于提供对命令行配置选项的支持。
import argparse
import signal
import sys
import threading
import time
from python_banyan.banyan_base import BanyanBase
class HCSR04(BanyanBase, threading.Thread):
def __init__(self, **kwargs):
"""
kwargs contains the following parameters
:param back_plane_ip_address: If none, the local IP address is used
:param process_name: HCSR04
:param publisher_port: publishing port
:param subscriber_port: subscriber port
:param loop_time: receive loop idle time
:param trigger_pin: GPIO trigger pin number
:param echo_pin: GPIO echo pin number
"""
self.back_plane_ip_address = kwargs['back_plane_ip_address'],
self.process_name = kwargs['process_name']
self.publisher_port = kwargs['publisher_port']
self.subscriber_port = kwargs['subscriber_port'],
self.loop_time = kwargs['loop_time']
self.trigger_pin = kwargs['trigger_pin']
self.echo_pin = kwargs['echo_pin']
self.poll_interval = kwargs['poll_interval']
self.last_distance_value = 0
# initialize the base class
super(HCSR04, self).__init__(back_plane_ip_address=kwargs['back_plane_ip_address'],
subscriber_port=kwargs['subscriber_port'],
publisher_port=kwargs['publisher_port'],
process_name=kwargs['process_name'],
loop_time=kwargs['loop_time'])
threading.Thread.__init__(self)
self.daemon = True
self.lock = threading.Lock()
# subscribe to receive messages from arduino gateway
self.set_subscriber_topic('from_arduino_gateway')
# enable hc-sr04 in arduino gateway
payload = {'command': 'set_mode_sonar', 'trigger_pin': self.trigger_pin,
'echo_pin': self.echo_pin}
self.publish_payload(payload, 'to_arduino_gateway')
# start the thread
self.start()
try:
self.receive_loop()
except KeyboardInterrupt:
self.clean_up()
sys.exit(0)
def incoming_message_processing(self, topic, payload):
print(topic, payload)
with self.lock:
self.last_distance_value = payload['value']
def run(self):
while True:
with self.lock:
distance = self.last_distance_value
payload = {'distance': distance}
topic = 'distance_poll'
self.publish_payload(payload, topic)
time.sleep(self.poll_interval)
def hcsr04():
parser = argparse.ArgumentParser()
# allow user to bypass the IP address auto-discovery.
# This is necessary if the component resides on a computer
# other than the computing running the backplane.
parser.add_argument("-b", dest="back_plane_ip_address", default="None",
help="None or IP address used by Back Plane")
parser.add_argument("-i", dest="poll_interval", default=1.0,
help="Distance polling interval")
parser.add_argument("-n", dest="process_name", default="HC-SRO4 Demo",
help="Set process name in banner")
parser.add_argument("-p", dest="publisher_port", default="43124",
help="Publisher IP port")
parser.add_argument("-s", dest="subscriber_port", default="43125",
help="Subscriber IP port")
parser.add_argument("-t", dest="loop_time", default=".1",
help="Event Loop Timer in seconds")
parser.add_argument("-x", dest="trigger_pin", default="12",
help="Trigger GPIO pin number")
parser.add_argument("-y", dest="echo_pin", default="13",
help="Echo GPIO pin number")
args = parser.parse_args()
if args.back_plane_ip_address == 'None':
args.back_plane_ip_address = None
kw_options = {'back_plane_ip_address': args.back_plane_ip_address,
'publisher_port': args.publisher_port,
'subscriber_port': args.subscriber_port,
'process_name': args.process_name,
'loop_time': float(args.loop_time),
'trigger_pin': int(args.trigger_pin),
'echo_pin': int(args.echo_pin),
'poll_interval': int(args.poll_interval)
}
# replace with the name of your class
HCSR04(**kw_options)
# signal handler function called when Control-C occurs
def signal_handler(sig, frame):
print('Exiting Through Signal Handler')
raise KeyboardInterrupt
# listen for SIGINT
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == '__main__':
hcsr04()
关于python - pyzmq REQ/REP 与异步等待变量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57156822/
我应该如何继续将 std::chrono::minutes::rep 类型值转换为小时表示。 #include #include using namespace std; using namesp
关闭。这个问题需要更多focused .它目前不接受答案。 想改善这个问题吗?更新问题,使其仅关注一个问题 editing this post . 4年前关闭。 Improve this questi
我有一个带有3个kafka节点和3个zk节点的kakfa集群。 生产者在AWS机器上尝试将数据推送到我的Intranet服务器上运行的kafka集群上。 使用以下命令从控制台创建主题(JOB_AWS_
我有关于序列和 each 的快速问题: vect1 <- c(4, 5, 10, 3, 1) 我想用这个向量复制每个,这样第一个数字被复制 4,第二个 5,第三个 10,第四个 3 和第五个等于 1。
当我在控制台中键入泛型的函数名称时,我希望看到对 UseMethod 的调用。例如,the documentation for determinant 将其称为泛型,当我将其输入控制台时得到以下输出:
我正在尝试理解 SIMD 和向量指令的概念。如果我理解正确的话: 向量指令是对一维数据数组(=向量)进行操作的指令,而不是对单个数据项进行操作的标量指令。 SIMD指令实际上是单指令多数据指令,看起来
引用英特尔® 64 和 IA-32 架构优化引用手册,第 2.4.6 节“REP 字符串增强”: The performance characteristics of using REP string
我正在尝试编写一个应用程序,允许用户启动长时间运行的计算进程,该进程将从使用 ØMQ 的 Web 服务器接收命令。我使用标准的请求-回复架构:服务器有一个连接到工作进程 REP 套接字的 REQ 套接
我希望使用多线程通过 Python 和 ZeroMQ 实现 REQ-REP 模式。 使用 Python,我可以在新客户端连接到服务器时创建一个新线程。该线程将处理与该特定客户端的所有通信,直到套接字关
我想创建一个列表,它是向量的 8 倍 c(2,6) ,即 8 个向量的列表。 错误:object = as.list(rep(c(2,6),8))结果是 16 个单个数字的列表:2 6 2 6 2 6
我需要将一个向量分解成一系列 x 并重复,我不太确定这个术语是什么。它是 rep 的倒数功能。所以一个向量 [1,2,2,2,2,1,1,1,1,1,2,2] -> [1x1, 4x2, 5x1, 2
x=1:20 [1] 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 rep(x,2) [1] 1 2 3 4 5 6 7 8 9 10 11 1
假设我做了一个长 REP INSB在普通优先级线程中从用户模式读取 PCI 设备寄存器。在它执行期间,以下哪些可以发生,哪些不能发生: 中断(其他内核) 中断(同核) PCI 访问(其他内核) PCI
怎么才能使用说明rep stosb执行速度比这段代码快? Clear: mov byte [edi],AL ; Write the value in AL to memory
我需要从端口读取一些 16 位值并将它们保存到缓冲区。我正在使用的教程建议使用 REP INSW 指令,但我不知道如何使用它,甚至不知道它是如何工作的...... 这条指令相当于两条IN指令吗? 最佳
什么是重用/发布等效原则以及为什么它很重要? 最佳答案 重用/发布等效原则 (REP) 说: The unit of reuse is the unit of release. Effective r
给定一个向量,例如 > x [1] 1 1 2 1 1 1 5 1 1 1 5 7 1 1 1 1 1 1 1 1 1 我想复制元素n次——但是——我希望旧元素被复制覆盖。使用基本的 rep 函数给
我需要从端口读取一些 16 位值并将它们保存到缓冲区。我正在使用的教程建议使用 REP INSW 指令,但我不知道如何使用它,甚至不知道它是如何工作的...... 这条指令相当于两条IN指令吗? 最佳
我在 Visual Studio 2008 上测试一些代码并注意到 security_cookie。我能理解它的意思,但我不明白这个指令的目的是什么。 rep ret /* REP to av
我想知道是否有更简单的方法来制作列表,例如 10 '4'、20 '6' 和 30 '3' 然后用函数 'rep 手写 (example <- c(4,4,4,4,...)) '.我知道我可以将某个序列
我是一名优秀的程序员,十分优秀!