gpt4 book ai didi

python - Celery Gevent 池 - ConcurrentObjectUseError

转载 作者:太空狗 更新时间:2023-10-30 01:25:46 25 4
gpt4 key购买 nike

我有一个使用 gevent 的 celery worker 执行 HTTP 请求并使用页面源添加另一个 celery 任务的池。

我正在使用 Django,RabbitMQ 作为代理,Redis 作为 celery 结果后端,Celery 4.1.0。

任务有ignore_result=True但我经常收到此错误 ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter...>

我看到它与Redis连接有关。

我不知道如何解决这个问题。这或多或少是任务的逻辑。我还尝试在调用 process_task.apply_async 时使用信号量但它没有用。

from gevent.lock import BoundedSemaphore

sem = BoundedSemaphore(1)


@app.task(ignore_result=True, queue='request_queue')
def request_task(url, *args, **kwargs):
# make the request
req = requests.get(url)

request = {
'status_code': req.status_code,
'content': req.text,
'headers': dict(req.headers),
'encoding': req.encoding
}
with sem:
process_task.apply_async(kwargs={'url': url, 'request': request})
print(f'Done - {url}')

这是堆栈跟踪:

cancel_wait_ex: [Errno 9] File descriptor was closed in another greenlet
File "redis/connection.py", line 543, in send_packed_command
self._sock.sendall(item)
File "gevent/_socket3.py", line 424, in sendall
data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
File "gevent/_socket3.py", line 394, in send
self._wait(self._write_event)
File "gevent/_socket3.py", line 156, in _wait
self.hub.wait(watcher)
File "gevent/hub.py", line 651, in wait
result = waiter.get()
File "gevent/hub.py", line 898, in get
return self.hub.switch()
File "gevent/hub.py", line 630, in switch
return RawGreenlet.switch(self)
ConnectionError: Error 9 while writing to socket. File descriptor was closed in another greenlet.
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 556, in send_packed_command
(errno, errmsg))
OSError: [Errno 9] Bad file descriptor
File "redis/connection.py", line 126, in _read_from_socket
data = self._sock.recv(socket_read_size)
File "gevent/_socket3.py", line 332, in recv
return _socket.socket.recv(self._sock, *args)
ConnectionError: Error while reading from socket: (9, 'Bad file descriptor')
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 538, in send_packed_command
self.connect()
File "redis/connection.py", line 446, in connect
self.on_connect()
File "redis/connection.py", line 520, in on_connect
if nativestr(self.read_response()) != 'OK':
File "redis/connection.py", line 577, in read_response
response = self._parser.read_response()
File "redis/connection.py", line 238, in read_response
response = self._buffer.readline()
File "redis/connection.py", line 168, in readline
self._read_from_socket()
File "redis/connection.py", line 143, in _read_from_socket
(e.args,))
BlockingIOError: [Errno 11] Resource temporarily unavailable
File "gevent/_socket3.py", line 390, in send
return _socket.socket.send(self._sock, data, flags)
OSError: [Errno 9] Bad file descriptor
File "redis/connection.py", line 543, in send_packed_command
self._sock.sendall(item)
File "gevent/_socket3.py", line 424, in sendall
data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
File "gevent/_socket3.py", line 396, in send
return _socket.socket.send(self._sock, data, flags)
ConnectionError: Error 9 while writing to socket. Bad file descriptor.
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 556, in send_packed_command
(errno, errmsg))
BlockingIOError: [Errno 11] Resource temporarily unavailable
File "gevent/_socket3.py", line 390, in send
return _socket.socket.send(self._sock, data, flags)
OSError: [Errno 9] Bad file descriptor
File "redis/connection.py", line 543, in send_packed_command
self._sock.sendall(item)
File "gevent/_socket3.py", line 424, in sendall
data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
File "gevent/_socket3.py", line 396, in send
return _socket.socket.send(self._sock, data, flags)
ConnectionError: Error 9 while writing to socket. Bad file descriptor.
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 556, in send_packed_command
(errno, errmsg))
BlockingIOError: [Errno 11] Resource temporarily unavailable
File "gevent/_socket3.py", line 390, in send
return _socket.socket.send(self._sock, data, flags)
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f271b53dea0>>
File "celery/app/trace.py", line 374, in trace_task
R = retval = fun(*args, **kwargs)
File "celery/app/trace.py", line 629, in __protected_call__
return self.run(*args, **kwargs)
File "drones/tasks.py", line 330, in blue_drone_request_task
blue_drone_process_task.apply_async(kwargs={'targetpage': targetpage, 'request': request})
File "celery/app/task.py", line 536, in apply_async
**options
File "celery/app/base.py", line 736, in send_task
self.backend.on_task_call(P, task_id)
File "celery/backends/redis.py", line 189, in on_task_call
self.result_consumer.consume_from(task_id)
File "celery/backends/redis.py", line 76, in consume_from
self._consume_from(task_id)
File "celery/backends/redis.py", line 82, in _consume_from
self._pubsub.subscribe(key)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 538, in send_packed_command
self.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 538, in send_packed_command
self.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2172, in _execute
connection.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2172, in _execute
connection.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2172, in _execute
connection.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 538, in send_packed_command
self.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 538, in send_packed_command
self.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2172, in _execute
connection.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2172, in _execute
connection.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 538, in send_packed_command
self.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2172, in _execute
connection.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 538, in send_packed_command
self.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2172, in _execute
connection.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2172, in _execute
connection.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 538, in send_packed_command
self.connect()
File "redis/connection.py", line 455, in connect
callback(self)
File "redis/client.py", line 2120, in on_connect
self.subscribe(**channels)
File "redis/client.py", line 2229, in subscribe
ret_val = self.execute_command('SUBSCRIBE', *iterkeys(new_channels))
File "redis/client.py", line 2161, in execute_command
self._execute(connection, connection.send_command, *args)
File "redis/client.py", line 2165, in _execute
return command(*args)
File "redis/connection.py", line 563, in send_command
self.send_packed_command(self.pack_command(*args))
File "redis/connection.py", line 543, in send_packed_command
self._sock.sendall(item)
File "gevent/_socket3.py", line 424, in sendall
data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
File "gevent/_socket3.py", line 394, in send
self._wait(self._write_event)
File "gevent/_socket3.py", line 150, in _wait
raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))

最佳答案

我不确定这是否是正确答案,但通过设置 CELERY_RESULT_BACKEND = None,我不再看到此错误。

希望对您有所帮助。

仅供引用 此错误仅在使用 gevent 池时发生。我尝试过的 Celery 版本:3.1、4.2.1。

关于python - Celery Gevent 池 - ConcurrentObjectUseError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49447720/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com