gpt4 book ai didi

python - 在 Celery 任务中时 Pydoop 调用不起作用

转载 作者:可可西里 更新时间:2023-11-01 14:48:49 26 4
gpt4 key购买 nike

我已经使用 Celery 为项目设置了两个文件和 Pydoop , tasks.pyHDFStorage.py

# tasks.py

from celery import Celery
from celery import shared_task
from celery.utils.log import get_task_logger
from HDFSStorage import HDFSStorage

app = Celery('tasks', broker='amqp://guest@localhost//')
logger = get_task_logger(__name__)
fs = HDFSStorage()
print fs.exists("/myfile.txt")

@shared_task
def add(x,y):
logger.info('Adding {0} + {1}'.format(x, y))
logger.info('Checking if file exists')
fs.exists("/myfile.txt")
logger.info('Done checking if file exists')
return x+y

# HDFSStorage.py

import pydoop
from pydoop.hdfs import hdfs

class HDFSStorage():
def __init__(self):
self.client = hdfs(host="master", port=54310, user="oskar")

def exists(self, name):
return self.client.exists(name)

运行 Celery 从任务外部的 fs.exists() 调用开始,并按预期输出 True

$ celery -A tasks worker -l info
True
[2016-06-08 15:54:15,298: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/ce
lery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

-------------- celery@master v3.1.23 (Cipater)
---- **** -----
--- * *** * -- Linux-3.19.0-32-generic-x86_64-with-LinuxMint-17.3-rosa
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f510d3162d0
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery


[tasks]
. tasks.add

[2016-06-08 15:54:15,371: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672/
/
[2016-06-08 15:54:15,382: INFO/MainProcess] mingle: searching for neighbors
[2016-06-08 15:54:16,395: INFO/MainProcess] mingle: all alone
[2016-06-08 15:54:16,412: WARNING/MainProcess] celery@master ready.
[2016-06-08 15:54:19,736: INFO/MainProcess] Events of group {task} enabled by remote.

但是,由于某些未知原因,运行具有相同 fs.exists() 调用的任务会卡住。

$ python
Python 2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from tasks import add
True
>>> print add.delay(5,4).get()

[2016-06-08 15:54:32,833: INFO/MainProcess] Received task: tasks.add[a50409a8-f82d-4376-
ace2-442a09c9ed4f]
[2016-06-08 15:54:32,834: INFO/Worker-2] tasks.add[a50409a8-f82d-4376-ace2-442a09c9ed4f]
: Adding 5 + 3
[2016-06-08 15:54:32,834: INFO/Worker-2] tasks.add[a50409a8-f82d-4376-ace2-442a09c9ed4f]
: Checking if file exists

删除任务中的 fs.exists() 调用可使任务正确完成。

我做错了什么?是什么让 Celery 不能与 Pydoop 一起工作?

最佳答案

必须在任务中创建 HDFSStorage 实例

# tasks.py

from celery import Celery
from celery import shared_task
from celery.utils.log import get_task_logger
from HDFSStorage import HDFSStorage

app = Celery('tasks', broker='amqp://guest@localhost//')
logger = get_task_logger(__name__)

@shared_task
def add(x,y):
fs = HDFSStorage()
logger.info('Adding {0} + {1}'.format(x, y))
logger.info('Checking if file exists')
fs.exists("/myfile.txt")
logger.info('Done checking if file exists')
return x+y

关于python - 在 Celery 任务中时 Pydoop 调用不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37705625/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com