- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
大家好,我的 dag 实际上运行良好,所有输出都在工作,但由于以下问题,airflow 的 UI 不会更改为成功和失败。在线阅读,我遇到了这两个:
do_xcom_push=False
并且 Xcom_push 将在 Airflow 2.0 版中弃用。
我只是不确定如何实际设置它?谁能分享任何见解?
完整代码:
import pandas as pd
import logging
import csv
import numpy as np
import datetime
import glob
import shutil
import time
import gcsfs
from airflow.operators import python_operator
from google.cloud import bigquery
import os
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "table"
# Dataframe Code
query_string = """
SELECT * FROM `df_table`
"""
gas_data = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
))
manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}
meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
'G4F9': {'': 'G4SZV-1'},
'G4F1': {'': 'G4SDZV-2'},
'G4K0': {'': 'BK-G4E'},
'E6S1': {'': 'E6VG470'},
'E6S2': {'': 'E6VG470'},
}
def map_manufacturer_model(s):
s = str(s)
model = ''
try:
manufacturer = manufacturers[s[:4]]
for k, m in meter_models[s[:4]].items():
if s[-4:].startswith(k):
model = m
break
except KeyError:
manufacturer = ''
return pd.Series({'NewMeterManufacturer': manufacturer,
'NewMeterModel': model
})
gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[],
write_disposition="WRITE_TRUNCATE", )
job = client.load_table_from_dataframe(gas_data, table_id, job_config=job_config) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id))
print('Loaded DATAFRAME into BQ TABLE')
default_dag_args = {'owner': 'ME',
'start_date': datetime.datetime(2021, 12, 16),
}
with models.DAG('Test_Dag_V1',
schedule_interval=None, #'45 10 * * *',
default_args=default_dag_args) as dag:
format_function = python_operator.PythonOperator(
task_id='format_function',
python_callable=format_data
),
map_manufacturer_model_function = python_operator.PythonOperator(
task_id='map_manufacturer_model_function',
python_callable=map_manufacturer_model,
op_kwargs={'s': 'stringValue'}
)
format_function >> map_manufacturer_model_function
Airflow 错误
[2021-12-15 16:44:26,180] {xcom.py:228} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-12-15 16:44:26,182] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1318, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1905, in xcom_push
XCom.set(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 79, in set
value = XCom.serialize_value(value)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 226, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Series is not JSON serializable
最佳答案
在您的 airflow.cfg 文件中,enable_xcom_pickling
关于python - 即使我的任务实际上已经完成, Airflow 2 仍然出错?错误 - 无法将 XCom 值序列化为 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70367529/
我正在使用 node.js 和 mocha 单元测试,并且希望能够通过 npm 运行测试命令。当我在测试文件夹中运行 Mocha 测试时,测试运行成功。但是,当我运行 npm test 时,测试给出了
我的文本区域中有这些标签 ..... 我正在尝试使用 replaceAll() String 方法替换它们 text.replaceAll("", ""); text.replaceAll("", "
早上好,我是 ZXing 的新手,当我运行我的应用程序时出现以下错误: 异常Ljava/lang/NoClassDefFoundError;初始化 ICOM/google/zxing/client/a
我正在制作一些哈希函数。 它的源代码是... #include #include #include int m_hash(char *input, size_t in_length, char
我正在尝试使用 Spritekit 在 Swift 中编写游戏。目的是带着他的角色迎面而来的矩形逃跑。现在我在 SKPhysicsContactDelegate (didBegin ()) 方法中犯了
我正在尝试创建一个用于导入 CSV 文件的按钮,但出现此错误: actionPerformed(java.awt.event.ActionEvent) in cannot implement
请看下面的代码 public List getNames() { List names = new ArrayList(); try { createConnection(); Sta
我正在尝试添加一个事件以在“dealsArchive”表中创建一个条目,然后从“deals”表中删除该条目。它需要在特定时间执行。 这是我正在尝试使用的: DELIMITER $$ CREATE EV
我试图将两个存储过程的表结果存储到 phpmyadmin 例程窗口中的单个表中,这给了我 mariadb 语法错误。单独调用存储过程给出了结果。 存储过程代码 BEGIN CREATE TABLE t
我想在 videoview 中加载视频之前有一个进度条。但是我收到以下错误。我还添加了所有必要的导入。 我在 ANDROID 中使用 AIDE 这是我的代码 public class MainActi
我已经使用了 AsyncTask,但我不明白为什么在我的设备 (OS 4.0) 上测试时仍然出现错误。我的 apk 构建于 2.3.3 中。我想我把代码弄错了,但我不知道我的错误在哪里。任何人都请帮助
我在测试 friend 网站的安全性时,通过在 URL 末尾添加 ' 发现了 SQL 注入(inject)漏洞该网站是用zend框架构建的我遇到的问题是 MySQL -- 中的注释语法不起作用,因此页
我正在尝试使用堆栈溢出答案之一的交互式信息窗口。 链接如下: interactive infowindow 但是我在代码中使用 getMap() 时遇到错误。虽然我尝试使用 getMapAsync 但
当我编译以下代码时出现错误: The method addMouseListener(Player) is undefined for the type Player 代码: import java.
我是 Android 开发的初学者。我正在开发一个接收 MySql 数据然后将其保存在 SQLite 中的应用程序。 我将 Json 用于同步状态,以便我可以将未同步数据的数量显示为要同步的待处理数据
(这里是Hello world级别的自动化测试人员) 我正在尝试下载一个文件并将其重命名以便于查找。我收到一个错误....这是代码 @Test public void allDownload(
我只是在写另一个程序。并使用: while (cin) words.push_back(s); words是string的vector,s是string。 我的 RAM 使用量在 4 或 5
我是 AngularJS 的新手,我遇到了一个问题。我有一个带有提交按钮的页面,当我单击提交模式时必须打开并且来自 URL 的数据必须存在于模式中。现在,模式打开但它是空的并且没有从 URL 获取数据
我正在尝试读取一个文件(它可以包含任意数量的随机数字,但不会超过 500 个)并将其放入一个数组中。 稍后我将需要使用数组来做很多事情。 但到目前为止,这一小段代码给了我 no match for o
有些人在使用 make 命令进行编译时遇到了问题,所以我想我应该在这里尝试一下,我已经在以下操作系统的 ubuntu 32 位和挤压 64 位上尝试过 我克隆了 git 项目 https://gith
我是一名优秀的程序员,十分优秀!