gpt4 book ai didi

python - Google Cloud Composer BigQuery Operator-获取作业 API HTTPError 404

转载 作者:太空宇宙 更新时间:2023-11-04 09:32:13 25 4
gpt4 key购买 nike

我正在尝试在 GCC 上运行 BigQueryOperator。我已经成功运行 BigQueryCreateEmptyTableOperator 和 BigQueryTableDeleteOperator。

这是我的 dag 代码:

import datetime
import os
import logging


from airflow import configuration
from airflow import models
from airflow import DAG
from airflow.operators import email_operator
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_check_operator
from airflow.utils import trigger_rule
from contextlib import suppress
import json
from airflow.operators import python_operator

yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())

default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
'email_on_failure': True,
'email_on_retry': True,
'project_id' : 'censored',
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}

bq_dataset_name= 'test_tf_blocket'
bq_githib_table_id = bq_dataset_name + '.trialtable'

# [START composer_quickstart_schedule]
with models.DAG(
dag_id='composer_nicholas',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_quickstart_schedule]

def greeting():
logging.info('Hello World!')

hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)

bq_union_query = bigquery_operator.BigQueryOperator(
task_id='bq_union_query',
bql="""
select * from test_tf_blocket.nicholas_union_query;
""",
query_params={})

email_start = email_operator.EmailOperator(
task_id='email_it',
to='nicholas@censored.my',
subject='Sample temail',
html_content="""
Done.
""")

hello_python >> bq_union_query >> email_start

dag 在遇到 bigqueryOperator 时失败并出现错误(日志):

*** Reading remote log from gs://asia-south1-staging-b017f2bf-bucket/logs/composer_nicholas/bq_union_query/2019-03-21T14:56:45.453098+00:00/30.log.
[2019-03-22 13:12:54,129] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [queued]>
[2019-03-22 13:12:54,167] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [queued]>
[2019-03-22 13:12:54,168] {models.py:1573} INFO -
-------------------------------------------------------------------------------
Starting attempt 30 of 3
-------------------------------------------------------------------------------

[2019-03-22 13:12:54,199] {models.py:1595} INFO - Executing <Task(BigQueryOperator): bq_union_query> on 2019-03-21T14:56:45.453098+00:00
[2019-03-22 13:12:54,200] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run composer_nicholas bq_union_query 2019-03-21T14:56:45.453098+00:00 --job_id 571 --raw -sd DAGS_FOLDER/nicholas_union_query.py --cfg_path /tmp/tmpn1ic1w_6']
[2019-03-22 13:13:06,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:06,400] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2019-03-22 13:13:08,433] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:08,431] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2019-03-22 13:13:08,435] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:08,435] {__init__.py:51} INFO - Using executor CeleryExecutor
[2019-03-22 13:13:09,182] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,181] {app.py:51} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2019-03-22 13:13:09,198] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,198] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-03-22 13:13:09,210] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,210] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-03-22 13:13:09,873] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:09,873] {models.py:271} INFO - Filling up the DagBag from /home/airflow/gcs/dags/nicholas_union_query.py
[2019-03-22 13:13:12,207] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query /usr/local/lib/airflow/airflow/models.py:2412: PendingDeprecationWarning: Invalid arguments were passed to BigQueryOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query *args: ()
[2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query **kwargs: {'api_resource_config': {'useQueryCache': True, 'jobType': 'QUERY'}}
[2019-03-22 13:13:12,208] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=PendingDeprecationWarning
[2019-03-22 13:13:12,209] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query /usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py:151: DeprecationWarning: Deprecated parameter `bql` used in Task id: bq_union_query. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
[2019-03-22 13:13:12,210] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=DeprecationWarning)
[2019-03-22 13:13:16,838] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:16,838] {cli.py:484} INFO - Running <TaskInstance: composer_nicholas.bq_union_query 2019-03-21T14:56:45.453098+00:00 [running]> on host airflow-worker-7c9b9c7f86-xwhg5
[2019-03-22 13:13:17,455] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,453] {bigquery_operator.py:159} INFO - Executing:
[2019-03-22 13:13:17,457] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query select * from test_tf_blocket.nicholas_union_query;
[2019-03-22 13:13:17,457] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:17,632] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,632] {gcp_api_base_hook.py:92} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2019-03-22 13:13:17,657] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:17,656] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2019-03-22 13:13:18,336] {logging_mixin.py:95} WARNING - /usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py:559: DeprecationWarning: Deprecated parameter `bql` used in `BigQueryBaseCursor.run_query` Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow
category=DeprecationWarning)
[2019-03-22 13:13:18,338] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,336] {logging_mixin.py:95} WARNING - /usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py:559: DeprecationWarning: Deprecated parameter `bql` used in `BigQueryBaseCursor.run_query` Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
[2019-03-22 13:13:18,338] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query category=DeprecationWarning)
[2019-03-22 13:13:18,360] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,359] {discovery.py:873} INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs?alt=json
[2019-03-22 13:13:18,885] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:18,884] {discovery.py:873} INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json
[2019-03-22 13:13:20,341] {models.py:1760} ERROR - ('BigQuery job status check failed. Final error was: %s', 404)
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1014, in run_with_configuratio
jobId=self.running_job_id).execute(
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe
return wrapped(*args, **kwargs
File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execut
raise HttpError(resp, content, uri=self.uri
googleapiclient.errors.HttpError: <HttpError 404 when requesting https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS"

During handling of the above exception, another exception occurred

Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 180, in execut
time_partitioning=self.time_partitionin
File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 651, in run_quer
return self.run_with_configuration(configuration
File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1036, in run_with_configuratio
err.resp.status
Exception: ('BigQuery job status check failed. Final error was: %s', 404
[2019-03-22 13:13:20,347] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,341] {models.py:1760} ERROR - ('BigQuery job status check failed. Final error was: %s', 404)
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last):
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1014, in run_with_configuration
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query jobId=self.running_job_id).execute()
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return wrapped(*args, **kwargs)
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execute
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query raise HttpError(resp, content, uri=self.uri)
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query googleapiclient.errors.HttpError: <HttpError 404 when requesting https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS">
[2019-03-22 13:13:20,348] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query During handling of the above exception, another exception occurred:
[2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last):
[2019-03-22 13:13:20,349] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query result = task_copy.execute(context=context)
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 180, in execute
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query time_partitioning=self.time_partitioning
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 651, in run_query
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return self.run_with_configuration(configuration)
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1036, in run_with_configuration
[2019-03-22 13:13:20,350] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query err.resp.status)
[2019-03-22 13:13:20,351] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Exception: ('BigQuery job status check failed. Final error was: %s', 404)
[2019-03-22 13:13:20,352] {models.py:1783} INFO - Marking task as UP_FOR_RETRY
[2019-03-22 13:13:20,352] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query [2019-03-22 13:13:20,352] {models.py:1783} INFO - Marking task as UP_FOR_RETRY
[2019-03-22 13:13:20,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last):
[2019-03-22 13:13:20,400] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1014, in run_with_configuration
[2019-03-22 13:13:20,403] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query jobId=self.running_job_id).execute()
[2019-03-22 13:13:20,405] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
[2019-03-22 13:13:20,406] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return wrapped(*args, **kwargs)
[2019-03-22 13:13:20,407] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 851, in execute
[2019-03-22 13:13:20,408] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query raise HttpError(resp, content, uri=self.uri)
[2019-03-22 13:13:20,409] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query googleapiclient.errors.HttpError: <HttpError 404 when requesting https://www.googleapis.com/bigquery/v2/projects/censored-analytics-censored/jobs/job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS?alt=json returned "Not found: Job censored-analytics-censored:job_Nh77hL-CG3GHYCfWZY2Fhz4PgLlS">
[2019-03-22 13:13:20,409] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:20,410] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query During handling of the above exception, another exception occurred:
[2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query
[2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Traceback (most recent call last):
[2019-03-22 13:13:20,411] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/bin/airflow", line 7, in <module>
[2019-03-22 13:13:20,412] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query exec(compile(f.read(), __file__, 'exec'))
[2019-03-22 13:13:20,412] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/airflow", line 32, in <module>
[2019-03-22 13:13:20,413] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query args.func(args)
[2019-03-22 13:13:20,414] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/utils/cli.py", line 74, in wrapper
[2019-03-22 13:13:20,414] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return f(*args, **kwargs)
[2019-03-22 13:13:20,415] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/cli.py", line 490, in run
[2019-03-22 13:13:20,416] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query _run(args, dag, ti)
[2019-03-22 13:13:20,416] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/bin/cli.py", line 406, in _run
[2019-03-22 13:13:20,417] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query pool=args.pool,
[2019-03-22 13:13:20,418] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
[2019-03-22 13:13:20,420] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return func(*args, **kwargs)
[2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query result = task_copy.execute(context=context)
[2019-03-22 13:13:20,421] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/operators/bigquery_operator.py", line 180, in execute
[2019-03-22 13:13:20,422] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query time_partitioning=self.time_partitioning
[2019-03-22 13:13:20,422] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 651, in run_query
[2019-03-22 13:13:20,425] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query return self.run_with_configuration(configuration)
[2019-03-22 13:13:20,425] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query File "/usr/local/lib/airflow/airflow/contrib/hooks/bigquery_hook.py", line 1036, in run_with_configuration
[2019-03-22 13:13:20,427] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query err.resp.status)
[2019-03-22 13:13:20,427] {base_task_runner.py:101} INFO - Job 571: Subtask bq_union_query Exception: ('BigQuery job status check failed. Final error was: %s', 404)

如果我输入了不同的 sql 查询,例如(删除表),该查询将起作用。为简单起见,我在这里进行选择查询。重点是,这里的 sql 查询有效,但 dag 失败了。似乎 dag 未能从 BQ 检索查询历史记录/作业历史记录。我检查了 json 文件是否存在,是的。这是屏幕截图

BQ SS

最初我认为这是一个权限问题,但我检查了一下,云 Composer 生成的服务帐户具有项目所有者权限和 BQ 管理员权限。我尝试四处搜索,但似乎找不到答案。

感谢任何帮助。

最佳答案

由于您的 BigQuery 数据集位于 asia-southeast1,因此 BigQuery 默认在同一位置创建了一个作业,即 asia-southeast1。但是,您的 Composer 环境中的 Airflow 试图在不指定位置字段的情况下获取作业的状态。

引用:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get

这已经被我的 PR 修复了,已经合并到 master 中,将在 v2.0.0 中发布。但是,Composer 的最新 Airflow 版本是 v1.10.2,因此您需要变通才能使其正常工作。

要解决此问题,您可以扩展 BigQueryCursor 并使用位置支持覆盖 run_with_configuration() 函数。请参阅: https://github.com/apache/airflow/pull/4695/files#diff-ee06f8fcbc476ea65446a30160c2a2b2R1213并检查如何修补它。

关于python - Google Cloud Composer BigQuery Operator-获取作业 API HTTPError 404,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55300785/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com