- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们的 Airflow 项目有一个从 BigQuery 查询并使用 Pool
并行转储到本地 JSON 文件的任务:
def dump_in_parallel(table_name):
base_query = f"select * from models.{table_name}"
all_conf_ids = range(1,10)
n_jobs = 4
with Pool(n_jobs) as p:
p.map(partial(dump_conf_id, base_query = base_query), all_conf_ids)
with open("/tmp/final_output.json", "wb") as f:
filenames = [f'/tmp/output_file_{i}.json' for i in all_conf_ids]
这项任务在 Airflow v1.10 中对我们来说工作正常,但在 v2.1+ 中不再工作。此处为第 2.1 节 - https://blog.mbedded.ninja/programming/languages/python/python-multiprocessing/ - 提到“如果您尝试从已使用 Pool 创建的子工作人员中创建 Pool,您将遇到错误:不允许守护进程拥有子进程”
这是完整的 Airflow 错误:
[2021-08-22 02:11:53,064] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/plugins/tasks/bigquery.py", line 249, in dump_in_parallel
with Pool(n_jobs) as p:
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
self._repopulate_pool()
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
w.start()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 110, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
如果重要,我们会使用 LocalExecutor 运行 Airflow 。知道为什么这个使用 Pool 的任务会在 Airflow v1.10 中工作,但不再在 Airflow 2.1 中工作吗?
最佳答案
Airflow 2 在后台使用不同的处理模型来加快处理速度,同时在运行的任务之间保持基于进程的隔离。
这就是为什么它使用 forking
和钩子(Hook)下的多处理来运行任务,但这也意味着如果你使用多处理,你将达到 Python 多处理的限制,它不允许链接多-处理。
我不能 100% 确定它是否会起作用,但您可以尝试将 execute_tasks_new_python_interpreter
配置设置为 True。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#execute-tasks-new-python-interpreter .此设置将导致 Airflow 在运行任务而不是 fork /使用多处理时启动新的 Python 解释器(尽管我不是 100% 确定后者)。尽管运行你的任务,它的运行速度会慢很多(最多几秒钟的开销),因为新的 Python 解释器必须在运行你的任务之前重新初始化并导入所有 Airflow 代码。
如果这不起作用,那么您可以使用 PythonVirtualenvOperator 启动您的多处理工作 - 它将启动一个新的 Python 解释器来运行您的 python 代码,您应该能够使用多处理。
关于python - Airflow 任务中不允许使用 multiprocessing.Pool 吗? - 断言错误 : daemonic processes are not allowed to have children,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68878031/
我正在尝试使用多处理和队列实现生产者-消费者场景;主进程是生产者,两个子进程使用队列中的数据。这在没有任何异常 发生的情况下有效,但问题是我希望能够在工作人员死亡时重新启动他们(kill -9 wor
我试图在一个管理进程下启动一个数据队列服务器(这样它以后可以变成一个服务),虽然数据队列服务器功能在主进程中工作正常,但它在一个进程中不起作用使用 multiprocessing.Process 创建
我的多处理需求非常简单:我从事机器学习工作,有时我需要评估多个数据集中的一个算法,或者一个数据集中的多个算法,等等。我只需要运行一个带有一些参数的函数并获取一个数字。 我不需要 RPC、共享数据,什么
创建进程池或简单地遍历一个进程以创建更多进程之间有任何区别(以任何方式)吗? 这有什么区别?: pool = multiprocessing.Pool(5) pool.apply_async(work
multiprocessing.BoundedSemaphore(3) 与 multiprocessing.Sempahore(3) 有何不同? 我希望 multiprocessing.Bounded
我尝试通过 multiprocessing 包中的 Queue 对 Pipe 的速度进行基准测试。我认为 Pipe 会更快,因为 Queue 在内部使用 Pipe。 奇怪的是,Pipe 在发送大型 n
我有这样一个简单的任务: def worker(queue): while True: try: _ = queue.get_nowait()
我正在尝试编写一个与 multiprocessing.Pool 同时应用函数的应用程序。我希望这个函数成为一个实例方法(所以我可以在不同的子类中以不同的方式定义它)。这似乎是不可能的;正如我在其他地方
在 python 2 中,multiprocessing.dummy.Pool 和 multiprocessing.pool.ThreadPool 之间有什么区别吗?源代码似乎暗示它们是相同的。 最佳
我正在开发一个用于财务目的的模型。我将整个 S&P500 组件放在一个文件夹中,存储了尽可能多的 .hdf 文件。每个 .hdf 文件都有自己的多索引(年-周-分)。 顺序代码示例(非并行化): im
到目前为止,我是这样做的: rets=set(pool.map_async(my_callback, args.hosts).get(60*4)) 如果超时,我会得到一个异常: File "/usr
参见下面的示例和执行结果: #!/usr/bin/env python3.4 from multiprocessing import Pool import time import os def in
我的任务是监听 UDP 数据报,对其进行解码(数据报具有二进制信息),将解码后的信息放入字典中,将字典转储为 json 字符串,然后将 json 字符串发送到远程服务器(ActiveMQ)。 解码和发
我在 macOS 上工作,最近被 Python 3.8 多处理中“fork”到“spawn”的变化所困扰(参见 doc )。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失
multiprocessing.Queue 的文档指出从项目入队到其腌制表示刷新到底层管道之间存在一点延迟。显然,您可以将一个项目直接放入管道中(它没有说明其他情况,并且暗示情况就是如此)。 为什么管
我运行了一些测试代码来检查在 Linux 中使用 Pool 和 Process 的性能。我正在使用 Python 2.7。 multiprocessing.Pool 的源代码似乎显示它正在使用 mul
我在 Windows Standard Embedded 7 上运行 python 3.4.3。我有一个继承 multiprocessing.Process 的类。 在类的 run 方法中,我为进程对
我知道multiprocessing.Process类似于 threading.Thread当我子类 multiprocessing.Process 时要创建一个进程,我发现我不必调用 __init_
我有教科书声明说在多处理器系统中不建议禁用中断,并且会花费太多时间。但我不明白这一点,谁能告诉我多处理器系统禁用中断的过程?谢谢 最佳答案 在 x86(和其他架构,AFAIK)上,启用/禁用中断是基于
我正在执行下面的代码并且它工作正常,但它不会产生不同的进程,而是有时所有都在同一个进程中运行,有时 2 个在一个进程中运行。我正在使用 4 cpu 机器。这段代码有什么问题? def f(values
我是一名优秀的程序员,十分优秀!