gpt4 book ai didi

Airflow BashOperator 日志不包含完整输出

转载 作者:行者123 更新时间:2023-12-02 21:54:20 26 4
gpt4 key购买 nike

我遇到一个问题,BashOperator 没有记录 wget 的所有输出。它只会记录输出的前 1-5 行。

我已经尝试过仅使用 wget 作为 bash 命令:

tester = BashOperator(
task_id = 'testing',
bash_command = "wget -N -r -nd --directory-prefix='/tmp/' http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip",
dag = dag)

我也尝试过将此作为一个较长的 bash 脚本的一部分,该脚本具有跟随 wget 的其他命令。 Airflow 在触发下游任务之前会等待脚本完成。下面是一个 bash 脚本示例:

#!/bin/bash
echo "Starting up..."
wget -N -r -nd --directory-prefix='/tmp/' http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip
echo "Download complete..."
unzip /tmp/httpcomponents-client-4.5.3-src.zip -o -d /tmp/test_airflow
echo "Archive unzipped..."

日志文件的最后几行:

[2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-13 18:33:34,214] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 1
[2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2017-04-13 18:33:34,215] {base_task_runner.py:95} INFO - Subtask:
[2017-04-13 18:33:35,068] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:35,068] {models.py:1342} INFO - Executing <Task(BashOperator): testing> on 2017-04-13 18:33:08
[2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,569] {bash_operator.py:71} INFO - tmp dir root location:
[2017-04-13 18:33:37,569] {base_task_runner.py:95} INFO - Subtask: /tmp
[2017-04-13 18:33:37,571] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:33:37,571] {bash_operator.py:81} INFO - Temporary script location :/tmp/airflowtmpqZhPjB//tmp/airflowtmpqZhPjB/testingCkJgDE
[2017-04-13 18:14:54,943] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,942] {bash_operator.py:82} INFO - Running command: /var/www/upstream/xtractor/scripts/Temp_test.sh
[2017-04-13 18:14:54,951] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,950] {bash_operator.py:91} INFO - Output:
[2017-04-13 18:14:54,955] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,954] {bash_operator.py:96} INFO - Starting up...
[2017-04-13 18:14:54,958] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:54,957] {bash_operator.py:96} INFO - --2017-04-13 18:14:54-- http://apache.cs.utah.edu/httpcomponents/httpclient/source/httpcomponents-client-4.5.3-src.zip
[2017-04-13 18:14:55,106] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,105] {bash_operator.py:96} INFO - Resolving apache.cs.utah.edu (apache.cs.utah.edu)... 155.98.64.87
[2017-04-13 18:14:55,186] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,186] {bash_operator.py:96} INFO - Connecting to apache.cs.utah.edu (apache.cs.utah.edu)|155.98.64.87|:80... connected.
[2017-04-13 18:14:55,284] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - HTTP request sent, awaiting response... 200 OK
[2017-04-13 18:14:55,285] {base_task_runner.py:95} INFO - Subtask: [2017-04-13 18:14:55,284] {bash_operator.py:96} INFO - Length: 1662639 (1.6M) [application/zip]
[2017-04-13 18:15:01,485] {jobs.py:2083} INFO - Task exited with return code 0

编辑:更多测试表明记录 wget 的输出存在问题。

最佳答案

这是因为在默认运算符中仅打印最后一行。请在安装 Airflow 的地方将代码替换为 airflow/operators/bash_operator.py 中的以下代码。通常,您需要查看您的 python 所在位置,然后转到 site-packages

from builtins import bytes
import os
import signal
import logging
from subprocess import Popen, STDOUT, PIPE
from tempfile import gettempdir, NamedTemporaryFile

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory


class BashOperator(BaseOperator):
"""
Execute a Bash script, command or set of commands.

:param bash_command: The command, set of commands or reference to a
bash script (must be '.sh') to be executed.
:type bash_command: string
:param xcom_push: If xcom_push is True, the last line written to stdout
will also be pushed to an XCom when the bash command completes.
:type xcom_push: bool
:param env: If env is not None, it must be a mapping that defines the
environment variables for the new process; these are used instead
of inheriting the current process environment, which is the default
behavior. (templated)
:type env: dict
:type output_encoding: output encoding of bash command
"""
template_fields = ('bash_command', 'env')
template_ext = ('.sh', '.bash',)
ui_color = '#f0ede4'

@apply_defaults
def __init__(
self,
bash_command,
xcom_push=False,
env=None,
output_encoding='utf-8',
*args, **kwargs):

super(BashOperator, self).__init__(*args, **kwargs)
self.bash_command = bash_command
self.env = env
self.xcom_push_flag = xcom_push
self.output_encoding = output_encoding

def execute(self, context):
"""
Execute the bash command in a temporary directory
which will be cleaned afterwards
"""
bash_command = self.bash_command
logging.info("tmp dir root location: \n" + gettempdir())
line_buffer = []
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:

f.write(bytes(bash_command, 'utf_8'))
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
logging.info("Temporary script "
"location :{0}".format(script_location))
logging.info("Running command: " + bash_command)
sp = Popen(
['bash', fname],
stdout=PIPE, stderr=STDOUT,
cwd=tmp_dir, env=self.env,
preexec_fn=os.setsid)

self.sp = sp

logging.info("Output:")
line = ''

for line in iter(sp.stdout.readline, b''):
line = line.decode(self.output_encoding).strip()
line_buffer.append(line)
logging.info(line)
sp.wait()
logging.info("Command exited with "
"return code {0}".format(sp.returncode))

if sp.returncode:
raise AirflowException("Bash command failed")
logging.info("\n".join(line_buffer))
if self.xcom_push_flag:
return "\n".join(line_buffer)

def on_kill(self):
logging.info('Sending SIGTERM signal to bash process group')
os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)

关于Airflow BashOperator 日志不包含完整输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43400302/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com