- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在做的是,假设您有几个需要执行的工作流程。这些工作流都有任务,任务的目标是不同的主机。最快的方法是运行进程内的每个工作流,并并行运行它们。
我正在尝试运行 python 多重处理来执行我在 celery 的帮助下调用的远程函数。如果我只运行一个进程,我的程序就可以正常运行。但是当我运行多个进程时,我收到以下错误。据我所知,问题在于同一 channel 上的并发发布。 channel 不应在线程等之间共享。
我该如何让 Celery 来解决这个问题?是一个我应该使用“celeryd”命令启动的参数,还是我需要在我的 python 程序中执行它?
Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
self.queue_declare(nowait, passive=False)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
nowait=nowait)
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1254, in queue_declare
self._send_method((50, 10), args)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
write_frame(1, channel, payload)
File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
frame_type, channel, size, payload, 0xce,
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
Process Process-3:
self.revive(self.channel)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
self.run()
self.exchange.declare(nowait)
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
self._target(*self._args, **self._kwargs)
nowait=nowait, passive=passive,
File "testHello.py", line 16, in test_hello_aux
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 613, in exchange_declare
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self._send_method((40, 10), args)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
write_frame(1, channel, payload)
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
frame_type, channel, size, payload, 0xce,
File "/usr/lib/python2.7/socket.py", line 224, in meth
self.exchange.declare(nowait)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
nowait=nowait, passive=passive,
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 620, in exchange_declare
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
(40, 11), # Channel.exchange_declare_ok
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 237, in _wait_method
self.method_reader.read_method()
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 189, in read_method
raise m
error: [Errno 104] Connection reset by peer
Process Process-4:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
self.queue_declare(nowait, passive=False)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
nowait=nowait)
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1258, in queue_declare
(50, 11), # Channel.queue_declare_ok
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 270, in _wait_method
self.wait()
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 69, in wait
return self.dispatch_method(method_sig, args, content)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 87, in dispatch_method
return amqp_method(self, args)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 526, in _close
(class_id, method_id), ConnectionError)
UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
celery --版本 3.1.11 (Cipater)amq --版本 0.9.1
最佳答案
使用 Celery 时,您不需要使用 python 多处理模块。 Celery 会为您处理一切。
在名为tasks.py的文件中定义您的任务
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
现在假设 add
函数实际上是您想要并行运行的函数。我们还要考虑一下术语。并行意味着同时,而异步意味着不同步。我不能保证您的任务会同时运行,但我可以保证它们不会同步运行。因此,我们继续使用术语“异步”。
celery 有 Canvas ,一组用于异步流控制的原语。您可能感兴趣的两个是group
和chord
。 group
允许您运行一组异步任务,并询问所有异步任务的结果(完成您加入时尝试的任务)。 chord
提供与 group
相同的功能,但在所有任务完成时触发回调。
调用代码示例:
WAIT_TIME = 10 # how ever long you are willing to wait for your tasks
from tasks import add
from celery import group
future = group(add.s(i**i, i**i) for i in xrange(10))()
results = future.get(timeout=WAIT_TIME)
Celery 任务会在自己的进程(您生成的工作进程)中自动运行,不需要您自己创建更多进程。
关于python - UNEXPECTED_FRAME - 60 类的预期内容 header ,却得到了非内容 header 框架,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23274571/
我需要开发一个简单的网站,我通常使用 bootstrap CSS 框架,但是我想使用 Gumbyn,它允许我使用 16 列而不是 12 列。 我想知道是否: 我可以轻松地改变绿色吗? 如何使用固定布局
这个问题在这里已经有了答案: 关闭 13 年前。 与直接编写 PHP 代码相比,使用 PHP 框架有哪些优点/缺点?
我开发了一个 Spring/JPA 应用程序:服务、存储库和域层即将完成。 唯一缺少的层是网络层。我正在考虑将 Playframework 2.0 用于 Web 层,但我不确定是否可以在我的 Play
我现有的 struts Web 应用程序具有单点登录功能。然后我将使用 spring 框架创建一个不同的 Web 应用程序。然后想要使用从 struts 应用程序登录的用户来链接新的 spring 应
我首先使用Spark框架和ORMLite处理网页上表单提交的数据,在提交中文字符时看到了unicode问题。我首先想到问题可能是由于ORMLite,因为我的MySQL数据库的字符集已设置为使用utf8
我有一个使用 .Net 4.5 功能的模块,我们的应用程序也适用于 XP 用户。所以我正在考虑将这个 .net 4.5 依赖模块移动到单独的项目中。我怎样才能有一个解决方案,其中有两个项目针对不同的版
我知道这是一个非常笼统的问题,但我想我并不是真的在寻找明确的答案。作为 PHP 框架的新手,我很难理解它。 Javascript 框架,尤其是带有 UI 扩展的框架,似乎通过将 JS 代码与设计分开来
我需要收集一些关于现有 ORM 解决方案的信息。 请随意编写任何编程语言。 你能谈谈你用过的最好的 ORM 框架吗?为什么它比其他的更好? 最佳答案 我使用了 NHibernate 和 Entity
除了 Apple 的 SDK 之外,还有什么强大的 iPhone 框架可供开始开发?有没有可以加快开发时间的方法? 最佳答案 此类框架最大的是Three20 。 Facebook 和许多其他公司都使用
有人可以启发我使用 NodeJS 的 Web 框架吗?我最近开始从免费代码营学习express js,虽然一切进展顺利,但我对express到底是什么感到困惑。是全栈框架吗?纯粹是为了后端吗?我发现您
您可以推荐哪种 Ajax 框架/工具包来构建使用 struts 的 Web 应用程序的 GUI? 最佳答案 我会说你的 AJAX/javascript 库选择应该较少取决于你的后端是如何实现的,而更多
我有生成以下错误的 python 代码: objc[36554]: Class TKApplication is implemented in both /Library/Frameworks/Tk.
首先,很抱歉,如果我问的问题很明显,因为我没有编程背景,那我去吧: 我想运行一系列测试场景并在背景部分声明了几个变量(我打印它们以仔细检查它们是否已正确声明),第一个是整数,另外两个字符串为你可以看到
在我们承担的一个项目中,我们正在寻找一个视频捕获和录制库。我们的基础工作(基于 google 搜索)表明 vlc (libvlc)、ffmpeg (libavcodec) 和 gstreamer 是三
我试过没有运气的情况下寻找某种功能来杀死/中断Play中的正常工作!框架。 我想念什么吗?还是玩了!实际没有添加此功能? 最佳答案 Java stop类中没有像Thread方法那样的东西,由于种种原因
我们希望在我们的系统中保留所有重大事件的记录。例如,在数据库可能存储当前用户状态的地方,事件日志应记录对该状态的所有更改以及更改发生的时间。 事件记录工具应该尽可能接近于事件引发器的零开销,应该容纳结
那里有 ActionScript 2.0/3.0 的测试框架列表吗? 最佳答案 2010-05-18 更新 由于这篇文章有点旧,而且我刚刚收到了赞成票,因此可能值得提供一些更新的信息,这样人们就不会追
我有一个巨大的 numpy 数组列表(一维),它们是不同事件的时间序列。每个点都有一个标签,我想根据其标签对 numpy 数组进行窗口化。我的标签是 0、1 和 2。每个窗口都有一个固定的大小 M。
我是 Play 的新手!并编写了我的第一个应用程序。这个应用程序有一组它依赖的 URL,从 XML 响应中提取数据并返回有效的 URL。 此应用程序需要在不同的环境(Dev、Staging 和 Pro
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 4年前关闭。 Improve thi
我是一名优秀的程序员,十分优秀!