gpt4 book ai didi

将 Celery 从 3.1 升级到 4.0 后 Redis 不返回结果

转载 作者:可可西里 更新时间:2023-11-01 11:30:36 26 4
gpt4 key购买 nike

我最近将我的 Celery 安装升级到了 4.0。经过几天的升级过程,我终于让它工作了……有点。有些任务会返回,但最终任务不会。

我有一个类 SFF,它接受并解析一个文件:

# Constructor with I/O file
def __init__(self, file):

# File data that's gonna get used a lot
sffDescriptor = file.fileno()
fileName = abspath(file.name)

# Get the pointer to the file
filePtr = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ)

# Get the header info
hdr = filePtr.read(HEADER_SIZE)
self.header = SFFHeader._make(unpack(HEADER_FMT, hdr))

# Read in the palette maps
print self.header.onDemandDataSize
print self.header.onLoadDataSize
palMapsResult = getPalettes.delay(fileName, self.header.palBankOff - HEADER_SIZE, self.header.onDemandDataSize, self.header.numPals)

# Read the sprite list nodes
nodesStart = self.header.sprListOff
nodesEnd = self.header.palBankOff
print nodesEnd - nodesStart
sprNodesResult = getSprNodes.delay(fileName, nodesStart, nodesEnd, self.header.numSprites)

# Get palette data
self.palettes = palMapsResult.get()

# Get sprite data
spriteNodes = sprNodesResult.get()

# TESTING
spritesResultSet = ResultSet([])
numSpriteNodes = len(spriteNodes)
# Split the nodes into chunks of size 32 elements
for x in xrange(0, numSpriteNodes, 32):
spritesResult = getSprites.delay(spriteNodes, x, x+32, fileName, self.palettes, self.header.palBankOff, self.header.onDemandDataSizeTotal)
spritesResultSet.add(spritesResult)
break # REMEMBER TO REMOVE FOR ENTIRE SFF

self.sprites = spritesResultSet.join_native()

无论是单个任务返回整个 spritesResult,还是我使用 ResultSet 拆分它,结果总是相同的:我使用的 Python 控制台只是卡在任一 spritesResultSet .join_native()spritesResult.get()(取决于我如何格式化它)。

这里是有问题的任务:

@task
def getSprites(nodes, start, end, fileName, palettes, palBankOff, onDemandDataSizeTotal):
sprites = []

with open(fileName, "rb") as file:
sffDescriptor = file.fileno()
sffData = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ)

for node in nodes[start:end]:
sprListNode = dict(SprListNode._make(node)._asdict()) # Need to convert it to a dict since values may change.
#print node
#print sprListNode

# If it's a linked sprite, the data length is 0, so get the linked index.
if sprListNode['dataLen'] == 0:
sprListNodeTemp = SprListNode._make(nodes[sprListNode['index']])
sprListNode['dataLen'] = sprListNodeTemp.dataLen
sprListNode['dataOffset'] = sprListNodeTemp.dataOffset
sprListNode['compression'] = sprListNodeTemp.compression

# What does the offset need to be?
dataOffset = sprListNode['dataOffset']
if sprListNode['loadMode'] == 0:
dataOffset += palBankOff #- HEADER_SIZE
elif sprListNode['loadMode'] == 1:
dataOffset += onDemandDataSizeTotal #- HEADER_SIZE

#print sprListNode

# Seek to the data location and "read" it in. First 4 bytes are just the image length
start = dataOffset + 4
end = dataOffset + sprListNode['dataLen']
#sffData.seek(start)

compressedSprite = sffData[start:end]

# Create the sprite
sprite = Sprite(sprListNode, palettes[sprListNode['palNo']], np.fromstring(compressedSprite, dtype=np.uint8))
sprites.append(sprite)

return json.dumps(sprites, cls=SpriteJSONEncoder)

我知道它到达了 return 语句,因为如果我在它上面放一个 print,它会在 Celery 窗口中打印。我也知道任务正在运行完成,因为我从工作人员那里收到以下消息:

[2016-11-16 00:03:33,639: INFO/PoolWorker-4] Task framedatabase.tasks.getSprites[285ac9b1-09b4-4cf1-a251-da6212863832] succeeded in 0.137236133218s: '[{"width": 120, "palNo": 30, "group": 9000, "xAxis": 0, "yAxis": 0, "data":...'

这是我在 settings.py 中的 celery 设置:

# Celery settings
BROKER_URL='redis://localhost:1717/1'
CELERY_RESULT_BACKEND='redis://localhost:1717/0'
CELERY_IGNORE_RESULT=False
CELERY_IMPORTS = ("framedatabase.tasks", )

...和我的 celery.py:

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'framedatabase.settings')

from django.conf import settings # noqa

app = Celery('framedatabase', backend='redis://localhost:1717/1', broker="redis://localhost:1717/0",
include=['framedatabase.tasks'])

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

最佳答案

发现问题。显然它会导致死锁,正如 Celery 文档中的“避免启动同步子任务”部分所述:http://docs.celeryproject.org/en/latest/userguide/tasks.html#tips-and-best-practices

所以我摆脱了这条线:

sprNodesResult.get()

并将最终结果改为链式:

self.sprites = chain(getSprNodes.s(fileName, nodesStart, nodesEnd, self.header.numSprites),
getSprites.s(0,32,fileName,self.palettes,self.header.palBankOff,self.header.onDemandDataSizeTotal))().get()

而且有效!现在我只需要找到一种方法来按照我想要的方式拆分它!

关于将 Celery 从 3.1 升级到 4.0 后 Redis 不返回结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40622009/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com