- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
当客户端请求状态时,如何以非阻塞方式使用 ZMQ 来“服务”长时间运行的作业的状态?
下面的代码说明了如何暂时“中断”长时间运行的任务以发送当前状态。
任务运行时间长是因为有很多 url
需要处理,而不是因为每个 url 都需要很长时间来处理。这意味着服务器几乎可以立即向客户端响应当前状态。
我无法以非阻塞方式实现此逻辑,因为使用标志 zmq.NOBLOCK
导致 Again: Resource temporary unavailable
,并且不使用标志表示服务器阻塞并等待接收消息。
如何实现这样的逻辑/行为?我愿意使用 C++ 或 Python。
服务器代码:
import zmq
# Socket details
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
# List of many urls
urls = ['http://google.com','http://yahoo.com']
def process(url):
"""Sample function"""
pass
processed_urls = []
for url in urls:
# If a message has been received by a client respond to the message
# The response should be the current status.
if socket.recv(zmq.NOBLOCK):
msg = b"Processed the following urls %s" % str(processed_urls).encode()
socket.send(msg, zmq.NOBLOCK)
# Continue processing the urls
process(url)
processed_urls.append(url)
最佳答案
首先 - NON-BLOCKING 是一把双刃剑。有两个世界,每个世界都可以,有时确实会阻塞。
GIL-side 和/或process-side "blocking" 可能会出现(numpy
下面的示例,但对于任何无法轻松实现非阻塞解决方法的同步阻塞调用有效),而某些外部进程或全局应用程序架构可能仍需要(至少)一些响应& 握手行为甚至来自这种故意“阻止”的 Python 代码区域。
第二个世界是您的 ZeroMQ(可能)阻塞调用。设置 zmq.CONFLATE
还可以帮助您从长时间运行的客户端到服务器进行类似 PUSH 的 URL 报告。在报告套接字的客户端和服务器端设置 CONFLATE
。
在我力所能及的每个地方,我都提倡严格的非阻塞设计。即使是 ZeroMQ 代码的教科书示例也应该是真实且公平的,不会被屏蔽。我们生活在第三个千年,阻塞代码是一种性能和资源使用的毁灭性状态,主要是在专业级分布式系统设计的控制范围之外。
####################################################################
### NEED TO VIEW aHealthSTATUS FROM AN EXTERNAL_UNIVERSE:
### ( A LIGHTWEIGHT EXCULPATED MONITOR TO OBSERVE THE HEALTH OF THE EXECUTION ENVIRONMENT FROM OUTSIDE OF THE VM-JAIL, FROM AN OUTER HYPERVISOR SPACE )
### ( + using signal.signal() )
import signal, os
#-------------------------------------------------------------------
# .SET ZeroMQ INFRASTRUCTURE:
#-------------------------------------------------------------------
# .DEF SIG_handler(s)
def SIG_handler_based_HealthREPORTER( SIGnum, aFrame ):
print( 'SIG_handler called to report state with signal', SIGnum )
#---------------------------------------------------------------
# ZeroMQ .send( .SIG/.MSG )
pass; # yes, all the needed magic comes right here
#-------------------------------------------------------------------
# FINALLY:
raise OSError( "Had to send a HealthREPORT" ) # ??? do we indeed need this circus to be always played around, except in a DEMO-mode?
#-------------------------------------------------------------------
# .ASSOC SIG_handler:
signal.signal( signal.SIGALRM, SIG_handler_based_HealthREPORTER ) # .SET { SIGALRM: <aHandler> }-assoc
#-------------------------------------------------------------------
# .SET 1[sec]-delay + 1[sec]-interval
signal.setitimer( signal.ITIMER_REAL, 1, 1 ) # .SET REAL-TIME Interval-based WatchDog -- Decrements interval timer in real time, and delivers SIGALRM upon expiration.
# ------------------------------------------------------------------
# FINALLY:
#-------------------------------------------------------------------
# .SET / .DEACTIVATE
signal.setitimer( signal.ITIMER_REAL, 0 ) # .SET / DEACTIVATE
#-------------------------------------------------------------------
# .TERM GRACEFULLY ZeroMQ INFRASTRUCTURE
#-------------------------------------------------------------------
# CLEAN EXIT(0)
_exit(0)
让我分享一种用于某种aHealthMONITOR
的方法,用于一个确实很长的 principal-BLOCKING 计算案例。
让我们举一个 GIL 的“阻塞”类型计算的例子:
#######
# SETUP
signal.signal( signal.SIGALRM, SIG_ALRM_handler_A ) # .ASSOC { SIGALRM: thisHandler }
signal.setitimer( signal.ITIMER_REAL, 10, 5 ) # .SET @5 [sec] interval, after first run, starting after 10[sec] initial-delay
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1 # .RESET .INIT()
SIGALRM
+ ITIMER_REAL
的机制提供了一个可爱的自动化,至少可以通过一些响应让外部世界满意(在本例中频率为 ~ 0.2 [Hz],但主要{up-|down-}-可扩展到任何合理且系统范围内稳定的时间量-- 测试 0.5 [GHz ] 1.0 [GHz] VM 系统上的处理程序留给最终的黑客考虑 -- 否则适用合理的规模因素和非阻塞/低延迟设计的常识)
DEMO 读数显示,involuntary=
上下文切换如何演示阻塞无关机制(读取数字,随着它们的增长,而自愿性在整个 GIL 中保持不变-blocking part of the process ), 所以类似的def
-ed SIG_ALRM_handler_XYZ()
可以提供为您的过程状态独立点播记者提供的解决方案。
SIG_ALRM_handler_A(): activated Wed Oct 19 14:13:14 2016 ------------------------------ pctxsw(voluntary=53151, involuntary=1169)
>>> SIG_ALRM_last_ctx_switch_VOLUNTARY 53243
>>> SIG_ALRM_last_ctx_switch_FORCED 1169
>>> [ np.math.factorial( 2**f ) for f in range(20) ][:5] # too fast to notice @5[sec]
[1, 2, 24, 40320, 20922789888000]
#########
# COMPUTE
# len(str([np.math.factorial(2**f) for f in range(20)][-1])) # .RUN A "FAT"-BLOCKING CHUNK OF A regex/numpy/C/FORTRAN-calculus
>>> len( str( [ np.math.factorial( 2**f ) for f in range(20) ][-1] ) )
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1234) INSPECT processes ... ev. add a Stateful-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1257) INSPECT processes ... ev. add a Stateful-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1282) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1305) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1330) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1352) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1377) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1400) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1425) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1448) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1473) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1496) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1521) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1543) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1568) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1591) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1616) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1639) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1664) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1687) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1713) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1740) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1767) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1790) INSPECT processes ... ev. add a StateFul-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1812) INSPECT processes ... ev. add a StateFul-self-Introspection
2771010
在此流程上下文中,使用了此处理程序:
########################################################################
### SIGALRM_handler_
###
import psutil, resource, os, time
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1
SIG_ALRM_last_ctx_switch_FORCED = -1
def SIG_ALRM_handler_A( aSigNUM, aFrame ): # SIG_ALRM fired evenly even during [ np.math.factorial( 2**f ) for f in range( 20 ) ] C-based processing =======================================
#
# onEntry_ROTATE_SigHandlers() -- MAY set another sub-sampled SIG_ALRM_handler_B() ... { last: 0, 0: handler_A, 1: handler_B, 2: handler_C }
#
# onEntry_SEQ of calls of regular, hierarchically timed MONITORS ( just the SNAPSHOT-DATA ACQUISITION Code-SPRINTs, handle later due to possible TimeDOMAIN overlaps )
#
aProcess = psutil.Process( os.getpid() )
aProcessCpuPCT = aProcess.cpu_percent( interval = 0 ) # EVENLY-TIME-STEPPED
aCtxSwitchNUMs = aProcess.num_ctx_switches() # THIS PROCESS ( may inspect other per-incident later ... on anomaly )
aVolCtxSwitchCNT = aCtxSwitchNUMs.voluntary
aForcedSwitchCNT = aCtxSwitchNUMs.involuntary
global SIG_ALRM_last_ctx_switch_VOLUNTARY
global SIG_ALRM_last_ctx_switch_FORCED
if ( SIG_ALRM_last_ctx_switch_VOLUNTARY != -1 ): # .INIT VALUE STILL UNCHANGED
#----------
# .ON_TICK: must process delta(s)
if ( SIG_ALRM_last_ctx_switch_VOLUNTARY == aVolCtxSwitchCNT ):
#
# AN INDIRECT INDICATION OF A LONG-RUNNING WORKLOAD OUTSIDE GIL-STEPPING ( regex / C-lib / FORTRAN / numpy-block et al )
# ||||| vvv
# SIG_: Wed Oct 19 12:24:32 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=315) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:37 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=323) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:42 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=331) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:47 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=338) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:52 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=346) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:57 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=353) ~~~ 0.0
# ... ||||| ^^^
# 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]
# >>> ||||| |||
# vvvvv |||
# SIG_: Wed Oct 19 12:26:17 2016 ------------------------------ pctxsw(voluntary=49983, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:22 2016 ------------------------------ pctxsw(voluntary=49984, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:27 2016 ------------------------------ pctxsw(voluntary=49985, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:32 2016 ------------------------------ pctxsw(voluntary=49986, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:37 2016 ------------------------------ pctxsw(voluntary=49987, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:42 2016 ------------------------------ pctxsw(voluntary=49988, involuntary=502) ~~~ 0.0
print( "SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: ", time.ctime(), 10 * "-", aProcess.num_ctx_switches(), "{0:_>60s}".format( str( aProcess.threads() ) ), " INSPECT processes ... ev. add a StateFul-self-Introspection" )
else:
#----------
# .ON_INIT: may report .INIT()
print( "SIG_ALRM_handler_A(): activated ", time.ctime(), 30 * "-", aProcess.num_ctx_switches() )
##########
# FINALLY:
SIG_ALRM_last_ctx_switch_VOLUNTARY = aVolCtxSwitchCNT # .STO ACTUALs
SIG_ALRM_last_ctx_switch_FORCED = aForcedSwitchCNT # .STO ACTUALs
关于python - 如何在函数内部(以非阻塞方式)使用 zmq 在客户端请求时获取函数的状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45114658/
当我尝试运行文件时收到此错误? can't find package zmq while executing "package require zmq" 如何在 Ubuntu 中安装 zmq? 谢谢
我已经使用 Installation on Linux / bsd without root access 中提到的步骤安装了 zmq已成功安装,但在启动应用程序时出现错误。我在没有根访问权限的环境中
我正在尝试通过 czmqz (4.0.2) 使用 libzmq (4.2.2) 在 Python 程序和我的 C++ 应用程序之间建立 IPC 通信 channel 。 我正在使用 ZMQ 发布者/订
更新我的问题 如何在我的 python zmq 服务器中表示到达的消息以显示其内容? 根据这种行为,我是否可以假设 btnState 数据无论如何都会发送到 python 服务器? 上下文: 我正在发
我有一个客户端使用一对套接字与单个服务器通信: context = zmq.Context() socket = context.socket(zmq.PAIR) socket.setsockopt(
我有一个名为 GenericMessage 的类,显示在下面的第一个代码片段中(在 GenericMessage.hxx 中定义)。 我有一个名为 TestFE.cpp 的 .cpp 文件(参见下面的
我试图在 Go 应用程序中包含 ZMQ 套接字,但 zmq4 和 gozmq(Go 的引用 ZMQ 绑定(bind)库)都给我带来了问题。我想了解为什么 zmq4 特别不能在我的系统上导入。 我运行的
我遇到了一个关于 ZeroMQ 的问题,因为我在 ZMQ 套接字上为 zmq::proxy 和 zmq::poll 使用了指针。这样做会发生错误 88 的异常(非套接字上的套接字操作)。 实际上 Ze
我正在尝试在 MQL5 中设置一个 PUB 套接字,在 Python 中设置一个 SUB 套接字来接收消息。 我在 MQL5 中有这个: #include Context context("hell
我想用 React 做一个异步路由器到经销商消息传递,但它不工作。 http://zguide.zeromq.org/php:rtdealer中的代码正在工作,但我无法确定我在做什么不同。我正在使用
我有一个 Python 脚本,我在其中绑定(bind)了多个(例如 5 个)ZMQ 接收器套接字,如下所示: receiver_1 = context.socket(zmq.PULL) receive
在Linux-Ubuntu上安装ZeroMQ,一个网站 https://tuananh.org/2015/06/16/how-to-install-zeromq-on-ubuntu/ 说要运行以下命令
如何创建允许多个发布者和这些发布者的多个订阅者的网络? 还是绝对有必要使用消息代理? import time import zmq from multiprocessing import Proces
我研究 zmq 有一段时间了,并实现了一个简化的 poc - 模仿我的基础架构设计 - 使用它(特别是使用 NetMQ 包装器),取得了很好的效果。 我的情况是这样的: 将来我计划在一台机器上运行多个
我一直在阅读 ZMQ documentation在心跳上并阅读应该使用乒乓方法而不是用于偏执海盗模式的方法 For Paranoid Pirate, we chose the second appro
我正在寻找一种支持 ZMQ 的解决方案,用于连接到 0 或 1 个对等点的绑定(bind)端点之间的通信,仅此而已。通信是双向的,连接可以随时结束或切断;并且可以与新的对等点或同一对等点重新建立连接。
我正在使用 ZMQ PUB 套接字来发布不同主题的新闻。但是其中一些消息的计算成本很高。有没有办法有一个 on_subscribe 回调,这样我就可以只计算实际需要的东西? 最佳答案 只是为了记录我找
我有兴趣了解 ZMQ 在发送消息之前是否已经压缩了消息,这样我自己就不会这样做,因为压缩消息两次是多余的。 如果确实这样做,它是自动的,还是有一个选项参数可以指定? 我正在使用java,但这确实不重要
我在我的 Java 应用程序中使用 ZMQ。我发现它的行为不均匀,即如果我发送大约 100 条消息,其中一个消费者说需要 1 秒,那么如果我们继续增加消费者,所花费的时间将变为 2,1.5,3,这样。
所以我有 8 个工作线程(PULL 套接字),它们从单个绑定(bind)的 PUSH 套接字提供数据。它们每秒处理大量数据,有时会随机崩溃。显然,我应该尝试处理这些崩溃,但我很好奇这个系统目前的弹性如
我是一名优秀的程序员,十分优秀!