- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
大家好,我的 dag 实际上运行良好,所有输出都在工作,但由于以下问题,airflow 的 UI 不会更改为成功和失败。在线阅读,我遇到了这两个:
do_xcom_push=False
并且 Xcom_push 将在 Airflow 2.0 版中弃用。
我只是不确定如何实际设置它?谁能分享任何见解?
完整代码:
import pandas as pd
import logging
import csv
import numpy as np
import datetime
import glob
import shutil
import time
import gcsfs
from airflow.operators import python_operator
from google.cloud import bigquery
import os
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "table"
# Dataframe Code
query_string = """
SELECT * FROM `df_table`
"""
gas_data = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
))
manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}
meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
'G4F9': {'': 'G4SZV-1'},
'G4F1': {'': 'G4SDZV-2'},
'G4K0': {'': 'BK-G4E'},
'E6S1': {'': 'E6VG470'},
'E6S2': {'': 'E6VG470'},
}
def map_manufacturer_model(s):
s = str(s)
model = ''
try:
manufacturer = manufacturers[s[:4]]
for k, m in meter_models[s[:4]].items():
if s[-4:].startswith(k):
model = m
break
except KeyError:
manufacturer = ''
return pd.Series({'NewMeterManufacturer': manufacturer,
'NewMeterModel': model
})
gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[],
write_disposition="WRITE_TRUNCATE", )
job = client.load_table_from_dataframe(gas_data, table_id, job_config=job_config) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id))
print('Loaded DATAFRAME into BQ TABLE')
default_dag_args = {'owner': 'ME',
'start_date': datetime.datetime(2021, 12, 16),
}
with models.DAG('Test_Dag_V1',
schedule_interval=None, #'45 10 * * *',
default_args=default_dag_args) as dag:
format_function = python_operator.PythonOperator(
task_id='format_function',
python_callable=format_data
),
map_manufacturer_model_function = python_operator.PythonOperator(
task_id='map_manufacturer_model_function',
python_callable=map_manufacturer_model,
op_kwargs={'s': 'stringValue'}
)
format_function >> map_manufacturer_model_function
Airflow 错误
[2021-12-15 16:44:26,180] {xcom.py:228} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-12-15 16:44:26,182] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1318, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1905, in xcom_push
XCom.set(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 79, in set
value = XCom.serialize_value(value)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 226, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Series is not JSON serializable
最佳答案
在您的 airflow.cfg 文件中,enable_xcom_pickling
关于python - 即使我的任务实际上已经完成, Airflow 2 仍然出错?错误 - 无法将 XCom 值序列化为 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70367529/
在Airflow中,我一直在使用“airflow run”和“airflow test”,但不完全理解它们有何不同。他们有什么区别? 最佳答案 我自己通读了文档,发现它是多么令人困惑。 Airflow
我使用 Airflow 已经有一段时间了,它是由一位同事创建的。最近我遇到了一些错误,这需要我更深入地了解如何修复 Airflow 中的某些问题。 我确实理解这三个进程是什么,但我只是不明白运行它们时
AIRFLOW_HOME=/path/to/my/airflow_home 我收到这个警告... >airflow trigger_dag python_dag3 /Users/alexryan/mi
有没有人报告过他们在他们的公司中让 Airflow 扩展了多少?我正在考虑实现 Airflow 来执行 5,000 多个任务,每个任务每小时运行一次,有一天可以将其扩展到 20,000 多个任务。在检
问题 :我想使用 Github 上最新版本的 Apache-Airflow 安装 apache-airflow 以及所有依赖项? 我怎样才能使用 pip 做到这一点? 在生产环境中使用它是否安全? 最
我们在 AWS ECS 上运行 Airflow,并将所有 DAG 捆绑在一个 Docker 镜像中。我们不时更新 DAGS,并部署新版本的 Docker Image。当我们这样做时,ECS 将终止正在
问题很简单。我需要限制 Airflow 网络用户仅查看和执行某些 DAG 和任务。 如果可能,我宁愿不使用 Kerberos也不是 OAuth . Multi-tenancy option 似乎是一个
我们正在使用 Airflow 2.00。我正在尝试实现一个做两件事的 DAG: 通过 API 触发报告 从源到目标下载报告。 任务 1 和任务 2 之间至少需要 2-3 小时的间隔。根据我的研究,我有
对于一项任务,有许多辅助任务 - 从文件/数据库中获取/保存属性、验证、审计。这些辅助方法并不耗时。 一个示例 DAG 流, fetch_data >> actual_processing >> va
有什么方法可以重新加载作业而不必重新启动服务器吗? 最佳答案 在airflow.cfg中,您具有以下两种配置来控制此行为: # after how much time a new DAGs shoul
我们可以通过将任务/dag 超时设置为 None 并手动触发其运行来使用 Airflow dag 来定义永无止境的作业(即具有无条件循环以消耗流数据的任务)吗?让 Airflow 监测永无止境的任务会
我是 Airflow 的新手,最近开始探索这个工具。我在 18.4 版本的 ubuntu 机器上安装了 1.10.10 版。从设置和安装的角度来看,一切正常,但是我在任何 DAG 中的任务都没有运行,
我主要看到Airflow被用于ETL / Bid数据相关的工作。我正在尝试将其用于业务工作流,其中用户操作将来会触发一组相关任务。其中某些任务可能需要根据某些其他用户操作来清除(删除)。 我认为最好的
我有一个 DAG,只要 FileSensor 检测到文件,它就会使用它,为每个文件生成任务,以 (1) 将文件移动到暂存区域,(2) 触发单独的 DAG 来处理文件。 FileSensor -> Mo
我需要手动或以编程方式执行的管道,可以使用 Airflow 吗?看起来现在每个工作流程都必须与时间表绑定(bind)。 最佳答案 只需在创建 DAG 时将 schedule_interval 设置为
所以这是一个愚蠢的想法...... 我在 Airflow 中创建了(许多)DAG...并且它有效...但是,我想以某种方式将其打包,以便我可以在不安装 Airflow 的情况下运行单个 DAG 运行;
我使用“pip install 'apache-airflow[statsd]' 安装了 airflow[statsd] 并安装了 statsd_exporter。现在我可以看到来自 Promethe
我们正在尝试将 MongoHook 和 GCSToLocalFilesystemOperator 导入到我们的 Airflow 项目中: docs for MongoHook docs for GCS
启动 Airflow 网络服务器时出现以下错误 balajee@Balajees-MacBook-Air.local:~$ Airflow 网络服务器 -p 8080 [2018-12-03 00:2
运行pip install airflow[postgres]命令后出现以下错误: > raise RuntimeError("By default one of Airflow's dependen
我是一名优秀的程序员,十分优秀!