gpt4 book ai didi

python - 即使我的任务实际上已经完成, Airflow 2 仍然出错?错误 - 无法将 XCom 值序列化为 JSON

转载 作者:行者123 更新时间:2023-12-05 03:33:08 26 4
gpt4 key购买 nike

大家好,我的 dag 实际上运行良好,所有输出都在工作,但由于以下问题,airflow 的 UI 不会更改为成功和失败。在线阅读,我遇到了这两个:

do_xcom_push=False

并且 Xcom_push 将在 Airflow 2.0 版中弃用。

我只是不确定如何实际设置它?谁能分享任何见解?

完整代码:

import pandas as pd
import logging
import csv
import numpy as np
import datetime
import glob
import shutil
import time
import gcsfs
from airflow.operators import python_operator
from google.cloud import bigquery
import os
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

client = bigquery.Client()
bqclient = bigquery.Client()
# Output table for dataframe
table_id = "table"
# Dataframe Code
query_string = """
SELECT * FROM `df_table`
"""
gas_data = (
bqclient.query(query_string)
.result()
.to_dataframe(
create_bqstorage_client=True,
))

manufacturers = {'G4F0': 'FLN', 'G4F1': 'FLN', 'G4F9': 'FLN', 'G4K0': 'HWL', 'E6S1': 'LPG', 'E6S2': 'LPG'}

meter_models = {'G4F0': {'1': 'G4SZV-1', '2': 'G4SZV-2'},
'G4F9': {'': 'G4SZV-1'},
'G4F1': {'': 'G4SDZV-2'},
'G4K0': {'': 'BK-G4E'},
'E6S1': {'': 'E6VG470'},
'E6S2': {'': 'E6VG470'},
}

def map_manufacturer_model(s):
s = str(s)
model = ''
try:
manufacturer = manufacturers[s[:4]]
for k, m in meter_models[s[:4]].items():
if s[-4:].startswith(k):
model = m
break
except KeyError:
manufacturer = ''

return pd.Series({'NewMeterManufacturer': manufacturer,
'NewMeterModel': model
})


gas_data[['NewMeterManufacturer', 'NewMeterModel']] = gas_data['NewSerialNumber'].apply(map_manufacturer_model)
job_config = bigquery.LoadJobConfig(
# Specify a (partial) schema. All columns are always written to the
# table. The schema is used to assist in data type definitions.
schema=[],
write_disposition="WRITE_TRUNCATE", )
job = client.load_table_from_dataframe(gas_data, table_id, job_config=job_config) # Make an API request.
job.result() # Wait for the job to complete.
table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows and {} columns to {}".format(
table.num_rows, len(table.schema), table_id))
print('Loaded DATAFRAME into BQ TABLE')

default_dag_args = {'owner': 'ME',
'start_date': datetime.datetime(2021, 12, 16),
}

with models.DAG('Test_Dag_V1',
schedule_interval=None, #'45 10 * * *',
default_args=default_dag_args) as dag:
format_function = python_operator.PythonOperator(
task_id='format_function',
python_callable=format_data
),

map_manufacturer_model_function = python_operator.PythonOperator(
task_id='map_manufacturer_model_function',
python_callable=map_manufacturer_model,
op_kwargs={'s': 'stringValue'}
)


format_function >> map_manufacturer_model_function

Airflow 错误

 [2021-12-15 16:44:26,180] {xcom.py:228} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2021-12-15 16:44:26,182] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1318, in _execute_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1905, in xcom_push
XCom.set(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 79, in set
value = XCom.serialize_value(value)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 226, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/opt/python3.8/lib/python3.8/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/python3.8/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Series is not JSON serializable

最佳答案

在您的 airflow.cfg 文件中,enable_xcom_pickling

关于python - 即使我的任务实际上已经完成, Airflow 2 仍然出错?错误 - 无法将 XCom 值序列化为 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70367529/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com