gpt4 book ai didi

python - UNEXPECTED_FRAME - 60 类的预期内容 header ,却得到了非内容 header 框架

转载 作者:太空宇宙 更新时间:2023-11-03 18:25:39 24 4
gpt4 key购买 nike

我正在做的是,假设您有几个需要执行的工作流程。这些工作流都有任务,任务的目标是不同的主机。最快的方法是运行进程内的每个工作流,并并行运行它们。

我正在尝试运行 python 多重处理来执行我在 celery 的帮助下调用的远程函数。如果我只运行一个进程,我的程序就可以正常运行。但是当我运行多个进程时,我收到以下错误。据我所知,问题在于同一 channel 上的并发发布。 channel 不应在线程等之间共享。

我该如何让 Celery 来解决这个问题?是一个我应该使用“celeryd”命令启动的参数,还是我需要在我的 python 程序中执行它?

    Process Process-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
self.queue_declare(nowait, passive=False)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
nowait=nowait)
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1254, in queue_declare
self._send_method((50, 10), args)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
write_frame(1, channel, payload)
File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
frame_type, channel, size, payload, 0xce,
File "/usr/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
Process Process-3:
self.revive(self.channel)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
self.run()
self.exchange.declare(nowait)
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
self._target(*self._args, **self._kwargs)
nowait=nowait, passive=passive,
File "testHello.py", line 16, in test_hello_aux
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 613, in exchange_declare
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self._send_method((40, 10), args)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 221, in write_method
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
write_frame(1, channel, payload)
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/amqp/transport.py", line 177, in write_frame
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 504, in declare
frame_type, channel, size, payload, 0xce,
File "/usr/lib/python2.7/socket.py", line 224, in meth
self.exchange.declare(nowait)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 166, in declare
nowait=nowait, passive=passive,
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 620, in exchange_declare
return getattr(self._sock,name)(*args)
error: [Errno 32] Broken pipe
(40, 11), # Channel.exchange_declare_ok
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 237, in _wait_method
self.method_reader.read_method()
File "/usr/local/lib/python2.7/dist-packages/amqp/method_framing.py", line 189, in read_method
raise m
error: [Errno 104] Connection reset by peer
Process Process-4:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "testHello.py", line 16, in test_hello_aux
print output.get()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 155, in wait_for
on_interval=on_interval)
File "/usr/local/lib/python2.7/dist-packages/celery/backends/amqp.py", line 229, in consume
no_ack=no_ack, accept=self.accept) as consumer:
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 359, in __init__
self.revive(self.channel)
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 371, in revive
self.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 381, in declare
queue.declare()
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 505, in declare
self.queue_declare(nowait, passive=False)
File "/usr/local/lib/python2.7/dist-packages/kombu/entity.py", line 531, in queue_declare
nowait=nowait)
File "/usr/local/lib/python2.7/dist-packages/amqp/channel.py", line 1258, in queue_declare
(50, 11), # Channel.queue_declare_ok
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 270, in _wait_method
self.wait()
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 69, in wait
return self.dispatch_method(method_sig, args, content)
File "/usr/local/lib/python2.7/dist-packages/amqp/abstract_channel.py", line 87, in dispatch_method
return amqp_method(self, args)
File "/usr/local/lib/python2.7/dist-packages/amqp/connection.py", line 526, in _close
(class_id, method_id), ConnectionError)
UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

celery --版本 3.1.11 (Cipater)amq --版本 0.9.1

最佳答案

使用 Celery 时,您不需要使用 python 多处理模块。 Celery 会为您处理一切。

在名为tasks.py的文件中定义您的任务


from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
return x + y

现在假设 add 函数实际上是您想要并行运行的函数。我们还要考虑一下术语。并行意味着同时,而异步意味着不同步。我不能保证您的任务会同时运行,但我可以保证它们不会同步运行。因此,我们继续使用术语“异步”。

celery 有 Canvas ,一组用于异步流控制的原语。您可能感兴趣的两个是groupchordgroup 允许您运行一组异步任务,并询问所有异步任务的结果(完成您加入时尝试的任务)。 chord 提供与 group 相同的功能,但在所有任务完成时触发回调。

调用代码示例:



WAIT_TIME = 10 # how ever long you are willing to wait for your tasks

from tasks import add
from celery import group

future = group(add.s(i**i, i**i) for i in xrange(10))()
results = future.get(timeout=WAIT_TIME)

Celery 任务会在自己的进程(您生成的工作进程)中自动运行,不需要您自己创建更多进程。

关于python - UNEXPECTED_FRAME - 60 类的预期内容 header ,却得到了非内容 header 框架,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23274571/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com