gpt4 book ai didi

hadoop - Airflow 失败 : ParseException line 2:0 cannot recognize input near

转载 作者:可可西里 更新时间:2023-11-01 15:20:50 25 4
gpt4 key购买 nike

我正在尝试在 Airflow 上运行测试任务,但我不断收到以下错误:

FAILED: ParseException 2:0 cannot recognize input near 'create_import_table_fct_latest_values' '.' 'hql'

这是我的 Airflow Dag 文件:

import airflow
from datetime import datetime, timedelta
from airflow.operators.hive_operator import HiveOperator
from airflow.models import DAG

args = {
'owner': 'raul',
'start_date': datetime(2018, 11, 12),
'provide_context': True,
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email': ['raul.gregglino@leroymerlin.ru'],
'email_on_failure': True,
'email_on_retry': False
}

dag = DAG('opus_data',
default_args=args,
max_active_runs=6,
schedule_interval="@daily"
)

import_lv_data = HiveOperator(
task_id='fct_latest_values',
hive_cli_conn_id='metastore_default',
hql='create_import_table_fct_latest_values.hql ',
hiveconf_jinja_translate=True,
dag=dag
)

deps = {}

# Explicity define the dependencies in the DAG
for downstream, upstream_list in deps.iteritems():
for upstream in upstream_list:
dag.set_dependency(upstream, downstream)

这是我的 HQL 文件的内容,以防这可能是问题所在而我无法理解:

*I'm testing the connection to understand if the table is created or not, then I'll try to LOAD DATA, hence the LOAD DATA is commented out.
CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
id_product STRING,
id_model STRING,
id_attribute STRING,
attribute_value STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED ',';

#LOAD DATA LOCAL INPATH
#'/media/windows_share/schemas/opus/fct_latest_values_20181106.csv'
#OVERWRITE INTO TABLE opus_data.fct_latest_values_new_data;

最佳答案

在 HQL 文件中,它应该是 FIELDS TERMINATED BY ',':

CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data (
id_product STRING,
id_model STRING,
id_attribute STRING,
attribute_value STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

在HQL文件中,注释应该以--开头,而不是#

这似乎也不正确并导致异常 hql='create_import_table_fct_latest_values.hql '

看看这个例子:

 #Create full path for the file
hql_file_path = os.path.join(os.path.dirname(__file__), source['hql'])
print hql_file_path
run_hive_query = HiveOperator(
task_id='run_hive_query',
dag = dag,
hql = """
{{ local_hive_settings }}
""" + "\n " + open(hql_file_path, 'r').read()
)

参见 here了解更多详情。

或者把所有的HQL都放到hql参数中:

hql='CREATE TABLE IF NOT EXISTS opus_data.fct_latest_values_new_data ...'

关于hadoop - Airflow 失败 : ParseException line 2:0 cannot recognize input near,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53260340/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com