- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
当使用 xcom_push=True
、xcom_all=True
和 auto_remove=True
运行 DockerOperator
时,任务会引发一个错误,就像容器在读取其 STDOUT
之前被删除一样。
以以下 DAG 为例:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.docker_operator import DockerOperator
from airflow.operators.python_operator import PythonOperator
# Default (but overridable) arguments for Operators instantiations
default_args = {
'owner': 'Satan',
'depends_on_past': False,
'start_date': datetime(2019, 11, 28),
'retry_delay': timedelta(seconds=2),
}
# DAG definition
def createDockerOperatorTask(xcom_all, auto_remove, id_suffix):
return DockerOperator(
# Default args
task_id="docker_operator" + id_suffix,
image='centos:latest',
container_name="container" + id_suffix,
api_version='auto',
command="echo 'FALSE';",
docker_url="unix://var/run/docker.sock",
network_mode="bridge",
xcom_push=True,
xcom_all=xcom_all,
auto_remove=auto_remove,
)
# Use dag as python context so all tasks are "automagically" linked (in no specific order) to it
with DAG('docker_operator_xcom', default_args=default_args, schedule_interval=timedelta(days=1)) as dag:
t1 = createDockerOperatorTask(xcom_all=True, auto_remove=True, id_suffix="_1")
t2 = createDockerOperatorTask(xcom_all=True, auto_remove=False, id_suffix="_2")
t3 = createDockerOperatorTask(xcom_all=False, auto_remove=True, id_suffix="_3")
# Set tasks precedence
dag >> t1
dag >> t2
dag >> t3
如果我们运行它,第一个任务会失败,而其他两个任务会成功。尽管如此,唯一“正确”运行的是 docker_container_3
,因为它正确设置了 xcom_value
,而 docker_container_2
则不然。这给我的感觉是它“尝试”读取 STDOUT
,当它不能时,它不会失败(因为它应该,如 docker_container_1
)。
xcom_push=True
、xcom_all=True
和 auto_remove=True
的任务 dock_operator_1
日志*** Log file does not exist: /usr/local/airflow/logs/docker_operator_xcom/docker_operator_1/2019-12-04T20:24:21.180209+00:00/1.log
*** Fetching from: http://5df603088df3:8793/log/docker_operator_xcom/docker_operator_1/2019-12-04T20:24:21.180209+00:00/1.log
[2019-12-04 20:24:24,959] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_1 2019-12-04T20:24:21.180209+00:00 [queued]>
[2019-12-04 20:24:24,984] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_1 2019-12-04T20:24:21.180209+00:00 [queued]>
[2019-12-04 20:24:24,984] {{taskinstance.py:841}} INFO -
--------------------------------------------------------------------------------
[2019-12-04 20:24:24,984] {{taskinstance.py:842}} INFO - Starting attempt 1 of 1
[2019-12-04 20:24:24,985] {{taskinstance.py:843}} INFO -
--------------------------------------------------------------------------------
[2019-12-04 20:24:24,998] {{taskinstance.py:862}} INFO - Executing <Task(DockerOperator): docker_operator_1> on 2019-12-04T20:24:21.180209+00:00
[2019-12-04 20:24:24,998] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 'docker_operator_xcom', 'docker_operator_1', '2019-12-04T20:24:21.180209+00:00', '--job_id', '72', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/qm_operators/exp_5_prueba.py', '--cfg_path', '/tmp/tmp4_eb_wcg']
[2019-12-04 20:24:25,987] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:25,986] {{settings.py:252}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1037
[2019-12-04 20:24:26,006] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 /usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-12-04 20:24:26,006] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 """)
[2019-12-04 20:24:26,838] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:26,838] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-12-04 20:24:26,841] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:26,838] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags/qm_operators/exp_5_prueba.py
[2019-12-04 20:24:26,982] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 [2019-12-04 20:24:26,982] {{cli.py:545}} INFO - Running <TaskInstance: docker_operator_xcom.docker_operator_1 2019-12-04T20:24:21.180209+00:00 [running]> on host 5df603088df3
[2019-12-04 20:24:27,001] {{docker_operator.py:201}} INFO - Starting docker container from image centos:latest
[2019-12-04 20:24:27,519] {{logging_mixin.py:112}} INFO - Attachs: []
[2019-12-04 20:24:27,575] {{taskinstance.py:1058}} ERROR - 404 Client Error: Not Found ("No such container: 635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe")
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 261, in _raise_for_status
response.raise_for_status()
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: http+docker://localhost/v1.39/containers/635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe/json
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/docker_operator.py", line 264, in execute
if self.xcom_all else str(line)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped
return f(self, resource_id, *args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 855, in logs
output = self._get_result(container, stream, res)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 451, in _get_result
return self._get_result_tty(stream, res, self._check_is_tty(container))
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped
return f(self, resource_id, *args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 447, in _check_is_tty
cont = self.inspect_container(container)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped
return f(self, resource_id, *args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 758, in inspect_container
self._get(self._url("/containers/{0}/json", container)), True
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 267, in _result
self._raise_for_status(response)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 263, in _raise_for_status
raise create_api_error_from_http_exception(e)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
raise cls(e, response=response, explanation=explanation)
docker.errors.NotFound: 404 Client Error: Not Found ("No such container: 635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe")
[2019-12-04 20:24:27,583] {{taskinstance.py:1089}} INFO - Marking task as FAILED.
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 Traceback (most recent call last):
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 261, in _raise_for_status
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 response.raise_for_status()
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 940, in raise_for_status
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 raise HTTPError(http_error_msg, response=self)
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 requests.exceptions.HTTPError: 404 Client Error: Not Found for url: http+docker://localhost/v1.39/containers/635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe/json
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 During handling of the above exception, another exception occurred:
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 Traceback (most recent call last):
[2019-12-04 20:24:27,639] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/bin/airflow", line 37, in <module>
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 args.func(args)
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return f(*args, **kwargs)
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 551, in run
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 _run(args, dag, ti)
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 469, in _run
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 pool=args.pool,
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return func(*args, **kwargs)
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 result = task_copy.execute(context=context)
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/lib/python3.7/site-packages/airflow/operators/docker_operator.py", line 264, in execute
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 if self.xcom_all else str(line)
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return f(self, resource_id, *args, **kwargs)
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 855, in logs
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 output = self._get_result(container, stream, res)
[2019-12-04 20:24:27,640] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 451, in _get_result
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return self._get_result_tty(stream, res, self._check_is_tty(container))
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return f(self, resource_id, *args, **kwargs)
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 447, in _check_is_tty
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 cont = self.inspect_container(container)
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 19, in wrapped
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 return f(self, resource_id, *args, **kwargs)
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 758, in inspect_container
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 self._get(self._url("/containers/{0}/json", container)), True
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 267, in _result
[2019-12-04 20:24:27,641] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 self._raise_for_status(response)
[2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 263, in _raise_for_status
[2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 raise create_api_error_from_http_exception(e)
[2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
[2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 raise cls(e, response=response, explanation=explanation)
[2019-12-04 20:24:27,649] {{base_task_runner.py:115}} INFO - Job 72: Subtask docker_operator_1 docker.errors.NotFound: 404 Client Error: Not Found ("No such container: 635f096a834e1fa20f4252287161f7a4765eed0f2aec706c1e5859e6c50dbdbe")
[2019-12-04 20:24:29,953] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:29,952] {{local_task_job.py:124}} WARNING - Time since last heartbeat(0.01 s) < heartrate(5.0 s), sleeping for 4.989579 s
[2019-12-04 20:24:34,948] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:34,947] {{local_task_job.py:103}} INFO - Task exited with return code 1
docker_operator_2
的日志,其中包含 xcom_push=True
、xcom_all=True
和 auto_remove=False
*** Log file does not exist: /usr/local/airflow/logs/docker_operator_xcom/docker_operator_2/2019-12-04T20:24:21.180209+00:00/1.log
*** Fetching from: http://5df603088df3:8793/log/docker_operator_xcom/docker_operator_2/2019-12-04T20:24:21.180209+00:00/1.log
[2019-12-04 20:24:24,794] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_2 2019-12-04T20:24:21.180209+00:00 [queued]>
[2019-12-04 20:24:24,829] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_2 2019-12-04T20:24:21.180209+00:00 [queued]>
[2019-12-04 20:24:24,829] {{taskinstance.py:841}} INFO -
--------------------------------------------------------------------------------
[2019-12-04 20:24:24,829] {{taskinstance.py:842}} INFO - Starting attempt 1 of 1
[2019-12-04 20:24:24,829] {{taskinstance.py:843}} INFO -
--------------------------------------------------------------------------------
[2019-12-04 20:24:24,842] {{taskinstance.py:862}} INFO - Executing <Task(DockerOperator): docker_operator_2> on 2019-12-04T20:24:21.180209+00:00
[2019-12-04 20:24:24,843] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 'docker_operator_xcom', 'docker_operator_2', '2019-12-04T20:24:21.180209+00:00', '--job_id', '71', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/qm_operators/exp_5_prueba.py', '--cfg_path', '/tmp/tmpeq9uc4kw']
[2019-12-04 20:24:26,174] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 [2019-12-04 20:24:26,173] {{settings.py:252}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1035
[2019-12-04 20:24:26,226] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 /usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-12-04 20:24:26,226] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 """)
[2019-12-04 20:24:27,685] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 [2019-12-04 20:24:27,678] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-12-04 20:24:27,685] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 [2019-12-04 20:24:27,678] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags/qm_operators/exp_5_prueba.py
[2019-12-04 20:24:27,973] {{base_task_runner.py:115}} INFO - Job 71: Subtask docker_operator_2 [2019-12-04 20:24:27,971] {{cli.py:545}} INFO - Running <TaskInstance: docker_operator_xcom.docker_operator_2 2019-12-04T20:24:21.180209+00:00 [running]> on host 5df603088df3
[2019-12-04 20:24:28,017] {{docker_operator.py:201}} INFO - Starting docker container from image centos:latest
[2019-12-04 20:24:28,643] {{logging_mixin.py:112}} INFO - Attachs: []
[2019-12-04 20:24:29,783] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:29,782] {{local_task_job.py:124}} WARNING - Time since last heartbeat(0.01 s) < heartrate(5.0 s), sleeping for 4.989846 s
[2019-12-04 20:24:34,780] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:34,779] {{local_task_job.py:103}} INFO - Task exited with return code 0
xcom_push=True
、xcom_all=False
和 auto_remove=True
的任务 docker_operator_3
日志*** Log file does not exist: /usr/local/airflow/logs/docker_operator_xcom/docker_operator_3/2019-12-04T20:24:21.180209+00:00/1.log
*** Fetching from: http://5df603088df3:8793/log/docker_operator_xcom/docker_operator_3/2019-12-04T20:24:21.180209+00:00/1.log
[2019-12-04 20:24:24,992] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_3 2019-12-04T20:24:21.180209+00:00 [queued]>
[2019-12-04 20:24:25,031] {{taskinstance.py:630}} INFO - Dependencies all met for <TaskInstance: docker_operator_xcom.docker_operator_3 2019-12-04T20:24:21.180209+00:00 [queued]>
[2019-12-04 20:24:25,032] {{taskinstance.py:841}} INFO -
--------------------------------------------------------------------------------
[2019-12-04 20:24:25,032] {{taskinstance.py:842}} INFO - Starting attempt 1 of 1
[2019-12-04 20:24:25,032] {{taskinstance.py:843}} INFO -
--------------------------------------------------------------------------------
[2019-12-04 20:24:25,054] {{taskinstance.py:862}} INFO - Executing <Task(DockerOperator): docker_operator_3> on 2019-12-04T20:24:21.180209+00:00
[2019-12-04 20:24:25,055] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 'docker_operator_xcom', 'docker_operator_3', '2019-12-04T20:24:21.180209+00:00', '--job_id', '73', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/qm_operators/exp_5_prueba.py', '--cfg_path', '/tmp/tmp94dzo8w7']
[2019-12-04 20:24:26,219] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 [2019-12-04 20:24:26,219] {{settings.py:252}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1039
[2019-12-04 20:24:26,294] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 /usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-12-04 20:24:26,294] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 """)
[2019-12-04 20:24:27,549] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 [2019-12-04 20:24:27,548] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-12-04 20:24:27,549] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 [2019-12-04 20:24:27,549] {{dagbag.py:92}} INFO - Filling up the DagBag from /usr/local/airflow/dags/qm_operators/exp_5_prueba.py
[2019-12-04 20:24:27,722] {{base_task_runner.py:115}} INFO - Job 73: Subtask docker_operator_3 [2019-12-04 20:24:27,721] {{cli.py:545}} INFO - Running <TaskInstance: docker_operator_xcom.docker_operator_3 2019-12-04T20:24:21.180209+00:00 [running]> on host 5df603088df3
[2019-12-04 20:24:27,754] {{docker_operator.py:201}} INFO - Starting docker container from image centos:latest
[2019-12-04 20:24:28,329] {{logging_mixin.py:112}} INFO - Attachs: []
[2019-12-04 20:24:29,979] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:29,979] {{local_task_job.py:124}} WARNING - Time since last heartbeat(0.01 s) < heartrate(5.0 s), sleeping for 4.989138 s
[2019-12-04 20:24:34,974] {{logging_mixin.py:112}} INFO - [2019-12-04 20:24:34,974] {{local_task_job.py:103}} INFO - Task exited with return code 0
docker_operator_2
的 XComsdocker_operator_3
的 XComs即使设置 auto_remove=False
(如 docker_container_2
中所示)使任务成功并正确设置 XCom,容器也永远不会被删除,并且 DAG 的 future 运行将失败,因为旧运行的容器将与新运行的容器冲突。
解决此问题的方法是在下游添加一个删除容器的任务,但这不是“干净的”。
有没有办法同时使用 xcom_push=True
和 auto_remove=True
运行 DockerOperator?
最佳答案
您可以子类化DockerOperator
并在post_execute
中删除容器。像这样:
class XComDockerOperator(DockerOperator):
def post_execute(self, context, result=None):
if self.cli is not None:
self.log.info('Removing Docker container')
self.cli.remove_container(self.container['Id'])
super().post_execute(context, result)
关于python - 使用DockerOperator时如何同时使用xcom_push=True和auto_remove=True?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59184551/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!