- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试测试一个简单的 Celery 应用程序。
config.py
extract/
celery.py
celeryconfig.py
tasks.py
playlists/
playlist.json
config.py
from os import environ
# Configs
REDIS_HOST = "0.0.0.0"
REDIS_PORT = 6379
BROKER_URL = environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
host=REDIS_HOST, port=str(REDIS_PORT)))
CELERY_RESULT_BACKEND = BROKER_URL
celery.py
from __future__ import absolute_import
from celery import Celery
from . import celeryconfig
app = Celery('extract')
app.config_from_object(celeryconfig)
app.autodiscover_tasks(['extract'], force=True)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
tasks.py
任务基本上是一个提取队列
,如下所示:
@app.task(queue='extraction')
def playlist_generator_with_audio_features(username):
playlist_ids = get_user_playlists(username)
api = get_api_client()
(...) # some more code
export_playlist_with_audio_features.delay(username=username, playlist_id=playlist_id,
artist=artist, track=track, popularity=popularity,
energy=energy, liveness=liveness, tempo=tempo,
speechiness=speechiness, acousticness=acousticness,
instrumentalness=instrumentalness, time_signature=time_signature,
danceability=danceability, key=key, duration_ms=duration_ms,
loudness=loudness, valence=valence, mode=mode,
uri=uri, preview=preview)
logger.info('Got audio features from {} track(s) in playlist id:{} by username {}'.format(
len(tracks_and_features),
playlist_id,
username
))
return True
和一个编写器队列
,如下所示:
@app.task(queue='writer')
def export_playlist_with_audio_features(username, playlist_id, artist, track, popularity,
energy, liveness,tempo, speechiness, acousticness,
instrumentalness, time_signature, danceability,
key, duration_ms, loudness, valence, mode,
uri, preview):
path = 'playlists/playlist.json'
obj = json.dumps({
'username':username,
'playlist_id': playlist_id,
'artist': artist,
'track': track,
'popularity': popularity,
'energy': energy,
'liveness': liveness,
'tempo': tempo,
'speechiness': speechiness,
'acousticness': acousticness,
'instrumentalness': instrumentalness,
'time_signature': time_signature,
'danceability': danceability,
'key': key,
'duration_ms': duration_ms,
'loudness': loudness,
'valence': valence,
'mode': mode,
'uri': uri,
'preview': preview
})
codecs.open(path, 'a', encoding='utf-8').write(obj + '\n')
logger.debug('Exported track {} by {} with audio features. Playlist owner: {}'.format(
track, artist, username))
return True
celeryconfig.py
from __future__ import absolute_import, unicode_literals
import os
from celery.schedules import crontab
INSTALLED_APPS = ['app'] # include app
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IMPORTS = ('extract.tasks')
DEFAULT_URL = 'redis://localhost:6379/0'
BROKER_URL = os.environ.get('EXTRACT_BROKER_URL', DEFAULT_URL)
CELERY_RESULT_BACKEND = os.environ.get('EXTRACT_RESULT_BACKEND',
DEFAULT_URL)
CELERYBEAT_SCHEDULE = {
'playlist_generator_with_audio_features': {
'task': 'extract.tasks.playlist_generator_with_audio_features',
# Every minute
'schedule': crontab(minute="*"),
'args' : [(1),]
},
'export_playlist_with_audio_features': {
'task': 'extract.tasks.export_playlist_with_audio_features',
# Every minute
'schedule': crontab(minute="*"),
'kwargs' : {"username":"username",
"playlist_id": "playlist_id",
"artist": "artist",
"track": "track",
"popularity": "popularity",
"energy":"energy",
"liveness": "liveness",
"tempo": "tempo",
"speechiness": "speechiness",
"acousticness":"acousticness",
"instrumentalness":"instrumentalness",
"time_signature":"time_signature",
"danceability":"danceability",
"key": "key",
"duration_ms": "duration_ms",
"loudness": "loudness",
"valance": "valance",
"mode":"mode",
"uri":"uri",
"preview": "preview"},
}
}
Celery 工作人员自行工作,即任务将 json
序列化数据转储到 playlists/
中,使用以下命令:
$celery worker -A提取-l信息-Q提取
[2018-12-16 23:31:58,075: WARNING/MainProcess] celery@Vitors-MacBook-Pro-2.local ready.
[2018-12-16 23:32:29,494: INFO/MainProcess] Received task: extract.tasks.playlist_generator_with_audio_features[0713b788-c004-4c0c-b1ee-85068287856a]
[2018-12-16 23:32:30,892: INFO/Worker-4] extract.tasks.get_user_playlists[None]: Playlist name: Discover Weekly, Number of songs: 30, Playlist ID: 37i9dQZEVXcRI2aS94C6hY
[2018-12-16 23:32:30,893: INFO/Worker-4] extract.tasks.get_user_playlists[None]: Playlist name: Time, Number of songs: 10, Playlist ID: 1qxOGf3xD3fDQs03kwKteg
[2018-12-16 23:32:30,893: INFO/Worker-4] extract.tasks.get_user_playlists[None]: Playlist name: Vocoder, Number of songs: 280, Playlist ID: 7nROSBznyIkVgjSdNaHDxm
和
$ celeryworker -A extract -l info -Q writer
[2018-12-16 23:32:14,198: WARNING/MainProcess] celery@Vitors-MacBook-Pro-2.local ready.
[2018-12-16 23:32:32,467: INFO/MainProcess] Received task: extract.tasks.export_playlist_with_audio_features[a73f776d-2412-4aed-8e84-50c6485ebac8]
[2018-12-16 23:32:32,470: INFO/MainProcess] Received task: extract.tasks.export_playlist_with_audio_features[930fb552-5687-48ce-b61d-b573a79aa13c]
[2018-12-16 23:32:32,479: INFO/MainProcess] Received task: extract.tasks.export_playlist_with_audio_features[162a77a1-20ba-48c3-86c3-bb9c51eeed75]
[2018-12-16 23:32:32,486: INFO/MainProcess] Received task: extract.tasks.export_playlist_with_audio_features[89eff015-e692-4af0-a7a8-dcad06dc2e3f]
但是当我尝试使用celerybeat
来执行任务时,工作人员仅选择第一轮任务,而不是定期选择。
celerybeat -A app.celery --schedule=/tmp/celerybeat-schedule --loglevel=INFO --pidfile=/tmp/celerybeat.pid
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> /tmp/celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> now (0s)
[2018-12-16 23:32:32,706: INFO/MainProcess] beat: Starting...
[2018-12-16 23:32:32,736: INFO/MainProcess] Scheduler: Sending due task export_playlist_with_audio_features (tasks.export_playlist_with_audio_features)
[2018-12-16 23:32:32,743: INFO/MainProcess] Scheduler: Sending due task playlist_generator_with_audio_features (tasks.playlist_generator_with_audio_features)
[2018-12-16 23:33:00,000: INFO/MainProcess] Scheduler: Sending due task playlist_generator_with_audio_features (tasks.playlist_generator_with_audio_features)
我在这里缺少什么?
最佳答案
@app.task(queue='extraction')
此处的选项不会被beat使用。使用 @periodic_task 装饰器来进行节拍,并且常规 celery 客户端使用特定的队列。
@periodic_task(run_every=crontab(minute='5'),
queue='celery_periodic',
options={'queue': 'celery_periodic'})
queue='celery_periodic'
选项在您从代码(.delay 或 .apply_async)调用任务时使用
options={'queue': 'celery_periodic'}
选项在 celerybeat 调用它时使用。
您还可以在时间表配置中提供节拍选项,例如
CELERYBEAT_SCHEDULE = {
'playlist_generator_with_audio_features': {
'task': 'extract.tasks.playlist_generator_with_audio_features',
'schedule': crontab(minute="*"),
'options': {'queue' : 'celery_periodic'} # specify queue
},
}
关于python - 工作人员没有定期从 celery-beat 中挑选任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53808483/
当我运行此命令进行 celery 节拍时。 [2013-06-27 02:17:05,936: INFO/MainProcess] Celerybeat: Starting... [2013-06-2
我需要构建一个处理两种类型任务的系统。一种类型可以创建更多自身或另一种类型的任务。将有很少的 worker (2-3)和只有一个主机。最重要的要求是系统应该优雅地处理重新启动:即在重新启动时,正在进行
我们使用 Celery 4.2.1 和 Redis,并为我们的任务设置了全局软超时和硬超时。我们所有的自定义任务都设计为保持在限制范围内,但每天内置任务 backend_cleanup 任务最终都会因
我知道这违背了使用 Celery 的全部目的,但是是否有一个函数会阻塞直到结果返回? 所以我可以调用 actual_result = MyTask.dont_delay(some_arg="foo")
我们使用 Celery 4.2.1 和 Redis,并为我们的任务设置了全局软超时和硬超时。我们所有的自定义任务都设计为保持在限制范围内,但每天内置任务 backend_cleanup 任务最终都会因
我知道这违背了使用 Celery 的全部目的,但是是否有一个函数会阻塞直到结果返回? 所以我可以调用 actual_result = MyTask.dont_delay(some_arg="foo")
我计划使用 celery 作为我的项目的任务管理组件。它几乎具有我的项目所需的所有功能。我将有一组可以独立执行或按指定顺序执行的任务。在顺序任务中,我希望能够在中间任务之一失败时执行清理/回滚。我想知
它是运行 Celery 的实际处理器还是另一个进程?在花中,我可以在工作池中看到多个进程吗?这两者之间有什么区别? 最佳答案 当您运行 celery worker 时,它会创建一个父进程来管理正在运行
我有一个名为 ShippingApp 的项目,我按照步骤设置了 celery worker。我将 celery 3.1.26.post2 与 python3.7 一起使用,当我想启动 Celery W
尽我所能,我无法杀死这些 celery worker 。 我跑: celery --app=my_app._celery:app status 我看到我有3个(我不明白为什么3个 worker = 2
我在 docker 容器中运行了 celery ,我想检查选项 CELERY_TASK_RESULT_EXPIRES = '3600' 是否已应用。 我尝试使用 celery inspect conf
我使用 celery.chord(...) 创建一组任务和一个方法,该方法在组中的所有任务完成后被调用。 我使用 amqp 结果后端(但我想切换到 memcached)。 我的 worker 每秒钟一
我正在寻找一些关于将任务生成的列表映射到 celery 中的另一个任务的最佳方法的建议。 假设我有一个名为 parse 的任务,它解析 PDF 文档并输出页面列表。然后,每个页面都需要单独传递给另一个
这不是关于如何捕获 celery worker 日志的问题。有什么方法可以捕获生产者上的 celery 日志记录。我想要的是捕获当我调用 task.delay(...) 或 task.apply_as
我正在使用以下版本: 花==0.9.3 celery ==4.3.0 这为我提供了包含多个列的任务页面的以下显示: 每次我进入这个页面时,我最终都会重新排列页面,使列的顺序不同,并将行的顺序更改为降序
我想完成这样的事情: results = [] for i in range(N): data = generate_data_slowly() res = tasks.process
我想运行一个由beat 调度的复杂任务。让我们假设定义了默认的 add/mul 任务。 @app.on_after_configure.connect def setup_periodic_tasks
我有一个应用程序,其中包含 celery worker 。当我部署这将杀死那些正在运行的进程。 所以任务将开始,但永远不会完成,并且在部署完成时不会重新启动。 避免此问题并在部署完成后重新启动这些任务
我正在开始使用 Celery 进行 Django 项目。出于本地开发目的,我根据这些说明使用 djcelery 和 djkombu(数据库传输)进行了设置 http://ask.github.com/
如何配置 celery 在任务失败时发送电子邮件警报? 例如,我希望 Celery 在 3 个以上的任务失败或 10 个以上的任务被重试时通知我。 是否可以使用 celery 或实用程序(例如花),或
我是一名优秀的程序员,十分优秀!