- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用集群 Airflow 环境,其中有四个 AWS ec2 实例用于服务器。
ec2 实例
我的设置现在已经完美工作了三个月,但偶尔大约每周一次,当 Airflow 尝试记录某些内容时,我会收到 Broken Pipe 异常。
*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log
[2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1407} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00
[2018-07-16 00:00:15,720] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run foobar task_1 2018-07-13T00:00:00 --job_id 1320 --raw -sd DAGS_FOLDER/datalake_digitalplatform_arl_workflow_schedule_test_2.py']
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2018-07-16 00:00:16,534] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,533] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-07-16 00:00:16,597] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,597] {models.py:189} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/datalake_digitalplatform_arl_workflow_schedule_test_2.py
[2018-07-16 00:00:16,768] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - --- Logging error ---
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - Traceback (most recent call last):
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - File "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
self.flush()
[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - File "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
self.stream.flush()
[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - BrokenPipeError: [Errno 32] Broken pipe
[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - Call stack:
[2018-07-16 00:16:24,933] {logging_mixin.py:84} WARNING - File "/usr/bin/airflow", line 27, in <module>
args.func(args)
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
result = task_copy.execute(context=context)
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
while not self.poke(context):
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
directory = os.listdir(full_path)
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 36, in handle_timeout
self.log.error("Process timed out")
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - Message: 'Process timed out'
Arguments: ()
[2018-07-16 00:16:24,942] {models.py:1595} ERROR - Timeout
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
while not self.poke(context):
File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
directory = os.listdir(full_path)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout
[2018-07-16 00:16:24,942] {models.py:1624} INFO - Marking task as FAILED.
[2018-07-16 00:16:24,956] {models.py:1644} ERROR - Timeout
有时也会出现错误
*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
*** Failed to fetch log file from worker. 404 Client Error: NOT FOUND for url: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
我不确定为什么日志在大约 95% 的时间里都能正常工作,但在其他时候却随机失败。这是我的 Airflow.cfg 文件中的日志设置,
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /home/ec2-user/airflow/logs
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id =
encrypt_s3_logs = False
# Logging level
logging_level = INFO
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =
# Log format
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
# Name of handler to read task instance logs.
# Default to use file task handler.
task_log_reader = file.task
# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile =
# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5
# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793
# How often should stats be printed to the logs
print_stats_interval = 30
child_process_log_directory = /home/ec2-user/airflow/logs/scheduler
我想知道是否应该尝试不同的日志记录技术,例如写入 S3 存储桶,或者是否可以采取其他措施来解决此问题。
更新:
将日志写入 S3 并不能解决此问题。此外,错误现在更加一致(仍然是零星的)。现在大约有 50% 的时间发生这种情况。我注意到的一件事是,它所执行的任务是我的 AWS EMR 创建任务。启动 AWS EMR 集群大约需要 20 分钟,然后任务必须等待 Spark 命令在 EMR 集群上运行。所以单个任务运行时间大约是30分钟。我想知道这对于运行 Airflow 任务来说是否太长,以及这是否就是它开始无法写入日志的原因。如果是这种情况,那么我可以分解 EMR 任务,以便有一个任务用于 EMR 创建,然后有另一个任务用于 EMR 集群上的 Spark 命令。
注意:
我还在 Airflow 的 Jira 上创建了一个新的错误单 https://issues.apache.org/jira/browse/AIRFLOW-2844
最佳答案
此问题是我刚刚在此处解决的另一个问题的症状 AirflowException: Celery command failed - The recorded hostname does not match this instance's hostname .
我有一段时间没有看到AirflowException:Celery 命令失败,因为它出现在airflow worker 日志中。直到我实时观察 Airflow 工作人员日志时,我才看到抛出错误时我还在任务中遇到了 BrokenPipeException。
不过事情变得有点奇怪了。如果我执行 print("something to log")
和 AirflowException: Celery command failed...
错误,我只会看到 BrokenPipeException 抛出发生在Worker节点上。当我将所有打印语句更改为使用 importlogging ...logging.info("something to log")
时,我不会看到 BrokenPipeException 但是任务会由于 AirflowException: Celery command failed...
错误,仍然失败。但是,如果我没有看到 Airflow 任务日志中抛出 BrokenPipeException,我就不会知道任务失败的原因,因为一旦我消除了打印语句,我就从未在 Airflow 任务日志中看到任何错误(仅在 $ 上) Airflow 工作人员日志)
长话短说,有一些要点。
不要 print("something to log")
通过导入日志记录然后使用日志记录类(如 importlogging
)来使用 Airflow 的内置日志记录logging.info("要记录的内容")
如果您使用 AWS EC2 实例作为 Airflow 服务器,那么您可能会遇到此问题:https://github.com/apache/incubator-airflow/pull/2484此问题的修复已集成到 Airflow 版本 1.10 中(我目前使用 Airflow 版本 1.9)。所以升级你的Airflow version to 1.10 。您还可以使用the command here pip install git+git://github.com/apache/incubator-airflow.git@v1-10-stable
。另外,如果您不想升级 Airflow 版本,则可以按照 the github issue 上的步骤操作。使用修复程序手动更新文件,或者 fork Airflow 并挑选修复它的提交。
关于python-3.x - Airflow 日志 BrokenPipeException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51365911/
这是真的: log(A) + log(B) = log(A * B) [0] 这也是真的吗? O(log(A)) + O(log(B)) = O(log(A * B)) [1] 据我了解 O(f
日志 日志是构建工具的主要界面。如果日志太多,真正的警告和问题容易被隐藏。另一方面,如果出了错,你需要找出相关的信息。Gradle 定义了6个日志级别,如表 18.1,“日志级别”所示。除了那些您通
日志 关键进程日志如下…(将 替换为启动服务的用户,将 替换为计算机名称) NameNode: $ HADOOP_HOME / logs / hadoop- -namenode- .log Da
我正在探索项目的 git 历史 FFMpeg .我在提交之间对每个文件执行了更改 517573a67088b5c7a25c18373434e3448892ee93和 80bb65fafab1d2f5f
我不知道如何在 loggly 中使用正则表达式进行搜索。例如,使用表达式 /24nonstop.+7554/ 记录我想查找的内容. { "level_name": "WARNING", "ex
有没有办法为 API 调用打开日志记录? 我们有一个第三方应用程序在使用我们的商店时遇到问题,希望获得一些调试信息。 ~我已经搜索了 bt 一无所获。 我正在使用 1.7 最佳答案 在一段受控的时间内
我正在尝试获取 SVN 中所有副本/移动/等的固定路径的日志历史记录(如果可能的话,递归地)。实际上,我试图避免 peg revisions ,并将日志应用于路径而不是对象。 svn 手册提出了这个问
如何在命令行中运行 NAnt 脚本并在日志文件中获取每个任务的时间? using nant task or NAnt -buildfile:testscript.build testnanttarg
是否有任何默认方式来记录哪些用户代理访问了您的服务器?我需要编制一份访问我们网站的浏览器列表,以便我们知道我们最能支持什么。 谢谢! 最佳答案 日志CGI.HTTP_USER_AGENT ,也许在 A
我在我的应用程序中使用 Spring 发送电子邮件。 我想在发送电子邮件时记录 imap 服务器操作。 我尝试按如下方式在我的 applicationContext.xml 中实现日志:
我已经运行一个 pod 一个多星期了,从开始到现在没有重启过。但是,我仍然无法查看自它启动以来的日志,它只提供最近两天的日志。容器是否有任何日志轮换策略以及如何根据大小或日期控制轮换? 我尝试了以下命
背景: 我正在设置我的第一个 flex 堆栈,尽管我将开始简单,但是我想确保我从良好的体系结构开始。我最终希望有以下解决方案:托管指标,服务器日志(expressjs APM),单页应用程序监视(AP
常规的 hg log 命令给出每个变更集至少 4 行的输出。例如 changeset: 238:03a214f2a1cf user: My Name date: Th
我在我的项目中使用 Spring iBatis 框架。然后使用 logback 进行记录。然后,在检查日志文件时,我可以看到系统正在使用的数据库...出于安全目的我想隐藏它 这是示例日志.. 12:2
我想使用 hg log 生成一个简短的变更日志,涵盖最新版本的变更。发行版标有“v”前缀,例如“v0.9.1”或“v1.0”。是否可以使用 revsets 选择以“v”开头的最后两个标签之间的范围,不
我是 PHP 的新手,所以如果有一个简单的答案,请原谅我。我在 stackoverflow 中搜索过任何类似的问题,但找不到任何帮助。 我正在开发一个现有的基于 php 的应用程序,我只需要能够将对象
我有一个名为 Radius 的程序可以验证用户登录。运行在CentOS服务器上 日志在/var/log/radius.log 中 它们如下 Mon Jul 24 22:17:08 2017 : Aut
我最近从使用“日志”切换到“日志”。 到目前为止,还不错,但我缺少一项关键功能——在运行时更改最低级别的能力。 在“logging',我可以调用 myLogger.setLevel(logging.I
假设我们有速度关键的系统(例如统计/分析、套接字编程等),我们如何设计跟踪和日志。 更具体地说,日志和跟踪通常会降低性能(即使我们有关闭机制或冗长的扩展机制)。在这种情况下,是否有任何关于如何“放置”
有人知道这个 don't panic 日志包含什么类型的信息吗? /data/dontpanic 然后,我该如何分析这个日志? 最佳答案 我发现这个文件夹在内核崩溃发生后包含了一些 apanic 文件
我是一名优秀的程序员,十分优秀!