gpt4 book ai didi

python - Celery:组中的一个子任务总是超时

转载 作者:行者123 更新时间:2023-12-01 04:59:55 29 4
gpt4 key购买 nike

我在使用 Celery 时遇到了相当烦人的行为 group功能。

我需要定期检查一组主机解析的 IP,只是为了确保所述 IP 没有更改。为了做到这一点,我有一本带有 < hostname, IPs > 的字典。我需要验证一下。例如:

REQUIRED_HOSTS = {
'google.com': {'173.194.46.64', '173.194.46.70', '173.194.46.71'},
'stackoverflow.com': {'198.252.206.16'}
}

所以唯一要做的就是定期迭代 REQUIRED_HOSTS.keys() ,解析名称并查看它解析到的 IP 是否与我记录的不同。 (这里没什么好说的)

为了提高一点效率,每个名字都是并行解析的。我为此创建了一个子任务(它使用 dnspython 解析):

@my_tasks.task
def resolve_hostname(hostname, resolver=None):
""" This subtask resolves the 'hostname' to its IP addresses. It's
intended to be used in the 'compare_required_ips' function to resolve
names in parallel """
if resolver is None:
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers

try:
return (hostname,
{hst.address for hst in resolver.query(hostname)})
except Exception, e:
logger.exception("Got %s when trying to resolve hostname=%s"
% (type(e), hostname))
raise e

现在,查询所有主机名并生成子任务的方法如下:

@my_taks.task
def compare_required_ips():
""" This method verifies that the IPs haven't changed. """
retval = []
resolver = dns.resolver.Resolver()
resolver.nameservers = ['8.8.8.8' + '4.2.2.2'] + resolver.nameservers
retrieved_hosts = dict.fromkeys(required_hosts.REQUIRED_HOSTS.keys())
logger.info("Going to compare IPs for %s hostnames=%s"
% (len(required_hosts.REQUIRED_HOSTS.keys()),
required_hosts.REQUIRED_HOSTS.keys()))
ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
)()
for hostname, ips in ip_subtasks.get(timeout=90):
retrieved_hosts[hostname] = ips

for hostname in required_hosts.REQUIRED_HOSTS:
if (required_hosts.REQUIRED_HOSTS[hostname]
!= retrieved_hosts[hostname]):
retval.append(hostname)
logger.error(
"IP resolution mismatch. hostname=%s resolve_target=%s"
", resolve_actual=%s (mismatch=%s)"
% (hostname,
required_hosts.REQUIRED_HOSTS[hostname],
retrieved_hosts[hostname],
(required_hosts.REQUIRED_HOSTS[hostname]
^ retrieved_hosts[hostname]))
)
return retval

同样,相当简单...只需步行 REQUIRED_HOSTS键(又名主机名),生成一个子任务来解析每个键,然后在 90 秒超时后收集结果(发生在 for hostname, ips in ip_subtasks.get(timeout=90) 行中)

现在,麻烦的是除了一个子任务之外的所有子任务都在 90 秒的窗口内成功完成。然后父任务 ( compare_required_ips ) 由于 timeout=90 而失败当发生这种情况时,子任务将成功完成(在父任务失败后立即完成)。我尝试增加和减少超时,并且子任务始终采用我在 group 中指定的任何超时。创建,导致主任务报告失败。

我还手动运行了名称解析(没有使其成为 celery 任务,而是使用常规线程)并且它在几毫秒内解析。每一次,每一次我尝试做的测试。我不认为 dns.resolver.Resolver() 有问题class 。一切似乎都表明这个类的解决速度非常快,但是子任务,或者小组,或者...... Celery 中的某个人不知道它(尽管只是其中一个子任务)

我正在使用celery==3.1.9 , celery-with-redis==3.0flower==0.6.0进行监控。

任何帮助、提示或测试的东西都将非常感激。

最佳答案

一个问题可能是由于启动同步子任务而导致死锁。 compare_required_ips 是一个 celery 任务。在此任务中,您正在等待一组 resolve_hostname 任务完成,这确实效率很低。

所以你必须改变这个

ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
)()

ip_subtasks = group(
[resolve_hostname.s(hostname, resolver=resolver)
for hostname in required_hosts.REQUIRED_HOSTS.keys()]
).delay()

异步启动所有任务,避免死锁。

您不应该在 compate_required_ips 任务中执行 ip_subtasks.get()(即使 ip_subtask 只需一纳秒)。您必须为此编写一个新函数或使用 celery task_success signal .

关于python - Celery:组中的一个子任务总是超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26494598/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com