gpt4 book ai didi

python - Airflow - 收到意外的关键字参数 'dag'

转载 作者:行者123 更新时间:2023-12-03 01:57:37 33 4
gpt4 key购买 nike

我知道已经有一个 Airflow 函数可以将文件从 Cloud Storage 传递到 Big Query,就像我所做的那样,我在脚本内部与 GCP 建立了连接,就像没有 Airflow 一样,我调用了 PythonOperator 来调用我在脚本中配置的函数用于读取 Cloud Storage 并将文件中的数据插入 Big Query,但是我收到错误消息:“收到意外的关键字参数 'dag'”

这似乎是一个非常简单的问题,但我真的不知道这意味着什么,因为我将 DAG 属性放入了 PythonOperator

import json
import decimal
import airflow
from airflow import DAG
from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mssql_hook import MsSqlHook
from tempfile import NamedTemporaryFile
import pymssql
import logging
import os
# import cloudstorage as gcs
from google.cloud import bigquery
from oauth2client.client import GoogleCredentials
from airflow.operators.python_operator import PythonOperator

default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': False,
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'depends_on_past': False,
# If a task fails, retry it once after waiting
# at least 5 minutes
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
dag_id='test_tab1',
default_args=default_args,
schedule_interval=timedelta(days=1),
dagrun_timeout=timedelta(minutes=60)
)

try:
script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
script_path = "/usr/local/airflow/key/key.json"

#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path

def insert_bigquery(self):
bigquery_client = bigquery.Client(project="project-name")
dataset_ref = bigquery_client.dataset('bucket-name')
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
time_partitioning = bigquery.table.TimePartitioning()
job_config.time_partitioning = time_partitioning
job_config.clustering_fields = ["id"]
#job_config.field_delimiter = ";"
uri = "gs://bucket-name/"+filename
load_job = bigquery_client.load_table_from_uri(
uri,
dataset_ref.table('tab1'),
job_config=job_config
)
print('Starting job {}'.format(load_job.job_id))
load_job.result()
print('Job finished.')


json_gcs_to_bq = PythonOperator(
task_id='json_gcs_to_bq',
python_callable=insert_bigquery,
provide_context=True,
dag=dag)

错误消息:

[2019-06-21 15:45:40,732] {{models.py:1760}} ERROR - insert_bigquery() got an unexpected keyword argument 'dag'
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: insert_bigquery() got an unexpected keyword argument 'dag'

最佳答案

您不需要将self解析为您的python_callable。将 insert_bigquery 函数的参数修改为 def insert_bigquery(ds, **kwargs) 而不是 def insert_bigquery(self)

引用:https://airflow.apache.org/howto/operator/python.html

关于python - Airflow - 收到意外的关键字参数 'dag',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56707807/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com