- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我正在使用 docker-compose 来设置一个可扩展的 Airflow 集群。我的方法基于这个 Dockerfile https://hub.docker.com/r/puckel/docker-airflow/
我的问题是将日志设置为从 s3 写入/读取。当一个 dag 完成时,我会收到这样的错误
*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.
*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00
我像这样在 airflow.cfg
文件中设置了一个新部分
[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx
然后在airflow.cfg
remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn
我是否正确设置了它并且存在错误?这里有我缺少的成功秘诀吗?
-- 更新
我尝试以 URI 和 JSON 格式导出,但似乎都不起作用。然后我导出了 aws_access_key_id 和 aws_secret_access_key,然后 Airflow 开始拾取它。现在我在工作日志中得到了他的错误
6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00
-- 更新
我也找到了这个链接 https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html
然后我转入我的一台工作机器(独立于网络服务器和调度程序)并在 python 中运行这段代码
import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))
我收到此错误。
boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden
我尝试导出几种不同类型的 AIRFLOW_CONN_
环境,如连接部分 https://airflow.incubator.apache.org/concepts.html 中所述以及这个问题的其他答案。
s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3
{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}
{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}
我还导出了 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY,但没有成功。
这些凭据存储在数据库中,因此一旦我将它们添加到 UI 中,它们应该被工作人员拾取,但由于某种原因他们无法写入/读取日志。
最佳答案
UPDATE Airflow 1.10 使日志记录 a lot easier.
对于 s3 日志记录,根据 the above answer 设置连接 Hook
然后只需将以下内容添加到airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
对于 gcs 日志记录,
先安装 gcp_api 包,如下:pip install apache-airflow[gcp_api]。
按照 the above answer 设置连接 Hook
将以下内容添加到airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_logging = True
remote_base_log_folder = gs://my-bucket/path/to/logs
remote_log_conn_id = MyGCSConn
注意:从 Airflow 1.9 开始,远程日志记录为 significantly altered .如果您使用的是 1.9,请继续阅读。
引用 here
完整说明:
创建一个目录来存储配置并将其放置在 PYTHONPATH 中。一个例子是 $AIRFLOW_HOME/config
创建名为 $AIRFLOW_HOME/config/log_config.py 的空文件和$AIRFLOW_HOME/config/__init__.py
复制airflow/config_templates/airflow_local_settings.py的内容进入上一步刚刚创建的log_config.py文件。
自定义模板的以下部分:
#Add this variable to the top of the file. Note the trailing slash.
S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
LOGGING_CONFIG = ...
Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
'loggers': {
'airflow.task': {
'handlers': ['s3.task'],
...
},
'airflow.task_runner': {
'handlers': ['s3.task'],
...
},
'airflow': {
'handlers': ['console'],
...
},
}
确保根据 the above answer 在 Airflow 中定义了 s3 连接 Hook . Hook 应该对上面在 S3_LOG_FOLDER 中定义的 s3 存储桶具有读写访问权限。
更新 $AIRFLOW_HOME/airflow.cfg 以包含:
task_log_reader = s3.task
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = <name of the s3 platform hook>
重新启动 Airflow 网络服务器和调度程序,并触发(或等待)新的任务执行。
验证日志是否显示了您定义的存储桶中新执行的任务。
验证 s3 存储查看器是否在 UI 中工作。拉起一个新执行的任务,并验证您是否看到如下内容:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
[2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
[2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
[2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
[2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
关于python - 为 Airflow 中的日志设置 s3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44780736/
这个问题在这里已经有了答案: Why don't Java's +=, -=, *=, /= compound assignment operators require casting? (11 个
我搜索了很多,但没有一个链接能帮助我解决这个问题。我得到了 ORA-21500: internal error code, arguments: [%s], [%s], [%s], [%s], [%s
我正在做 RegexOne 正则表达式教程,它有一个 question关于编写正则表达式以删除不必要的空格。 教程中提供的解决方案是 We can just skip all the starting
([\s\S]+|\s?) 中 |\s? 的目的或作用是什么?如果没有它,表达式会不会与 ([\s\S]+) 相同? 最佳答案 这不是完全相同的。 ([\s\S]+|\s?) 会匹配空字符串,而 ([
这个正则表达式有一组还是两组? 我正在尝试使用第二组访问 bookTitle 但出现错误: Pattern pattern = Pattern.compile("^\\s*(.*?)\\s+-\\s+
在 C 中给定一个字符串指针 s,下面的迭代会做什么?即它以什么方式遍历字符串? for (++s ; *s; ++s); 最佳答案 for (++s ; *s;++s) 表示 将指针 s 递增到字符
我正在用一个 node.js 应用程序解析一个大列表并有这段代码 sizeCode = dbfr.CN_DESC.split('\s+-\s*|\s*-\s+') 这似乎不起作用,因为它返回了 [ '
我正在编写一个简单的字符串连接程序。 该程序按照我发布的方式运行。但是,我首先使用以下代码编写它来查找字符串的结尾: while (*s++) ; 但是,这个方法并没有奏效。我传递给它的字符串
这个问题已经有答案了: What does (?和aramchand来自Mohandas Karamchand G 因此,在使用这些匹配来分割字符串后,您最终会得到 {"M", "K", "G"} 注
我正在尝试转换 Map到 List使用 lambda。 本质上,我想将键和值与 '=' 连接起来之间。这看起来微不足道,但我找不到如何去做。 例如 Map map = new HashMap<>();
我正在经历 K & R,并且在递增指针时遇到困难。练习 5.3(第 107 页)要求您使用指针编写一个 strcat 函数。 在伪代码中,该函数执行以下操作: 将 2 个字符串作为输入。 找到字符串
在下面的代码中,pS 和 s.pS 在最后一行是否保证相等?也就是说,在语句S s = S();中,是否可以确定不会构造一个临时的S? #include using namespace std; s
演示示例代码: public void ReverseString(char[] s) { for(int i = 0, j = s.Length-1; i < j; i++, j--){
我一直在寻找类似于 .NET examples 中的示例的 PowerShell 脚本.取一个 New-TimeSpan 并显示为 1 天 2 小时 3 分钟 4 秒。排除其零的地方,在需要的地方添加
def func(s): s = s + " is corrected" return s string_list = ["She", "He"] for s in string_li
我是 python 的新手。当我在互联网上搜索 lambda 时。我在 lambda_functions 中找到了这个声明. processFunc = collapse and (lambda s:
我最近开始学习正则表达式,并试图为上面的问题写一个正则表达式。如果限制只放在一个字母上(例如不超过 2 个“b”),这并不困难。 那么答案就是:a* c*(b|ε)a* c*(b|ε)a* c* 但是
当我运行 npm install 时出现以下错误,但我无法修复它。 我试过:npm install -g windows-build-tools 也没有修复这个错误 ERR! configure
有很多有趣的haskell网上可以找到片段。 This post可以在 this (awesome) Stack Overflow question 下找到. The author写道: discou
我知道以下三行代码旨在将字符串提取到$ value中并将其存储在$ header中。但是我不知道$value =~ s/^\s+//;和$value =~ s/\s+$//;之间有什么区别。 $val
我是一名优秀的程序员,十分优秀!