gpt4 book ai didi

apache-spark - Airflow SparkSubmitOperator - 如何在另一台服务器中提交 Spark

转载 作者:可可西里 更新时间:2023-11-01 14:18:41 26 4
gpt4 key购买 nike

我是 Airflow 和 Spark 的新手,我正在努力使用 SparkSubmitOperator

我们的 Airflow 调度器和我们的 hadoop 集群没有设置在同一台机器上(第一个问题:这是一个好的做法吗?)。

我们有很多自动化程序需要调用pyspark脚本。这些 pyspark 脚本存储在 hadoop 集群 (10.70.1.35) 中。 Airflow 数据存储在 Airflow 机器(10.70.1.22)中。

目前,当我们想要使用 airflow spark-submit 一个 pyspark 脚本时,我们使用一个简单的 BashOperator,如下所示:

cmd = "ssh hadoop@10.70.1.35 spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 2g \
--executor-cores 2 \
/home/hadoop/pyspark_script/script.py"
t = BashOperator(task_id='Spark_datamodel',bash_command=cmd,dag=dag)

它工作得很好。但是我们想开始使用 SparkSubmitOperator 来激发提交我们的 pyspark 脚本

我试过这个:

from airflow import DAG
from datetime import timedelta, datetime
from airflow.contrib.operators.spark_submit_operator import
SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

dag = DAG('SPARK_SUBMIT_TEST',start_date=datetime(2018,12,10),
schedule_interval='@daily')


sleep = BashOperator(task_id='sleep', bash_command='sleep 10',dag=dag)

_config ={'application':'hadoop@10.70.1.35:/home/hadoop/pyspark_script/test_spark_submit.py',
'master' : 'yarn',
'deploy-mode' : 'cluster',
'executor_cores': 1,
'EXECUTORS_MEM': '2G'
}

spark_submit_operator = SparkSubmitOperator(
task_id='spark_submit_job',
dag=dag,
**_config)

sleep.set_downstream(spark_submit_operator)

语法应该没问题,因为 dag 没有显示为已损坏。但是当它运行时它给我以下错误:

[2018-12-14 03:26:42,600] {logging_mixin.py:95} INFO - [2018-12-14 
03:26:42,600] {base_hook.py:83} INFO - Using connection to: yarn
[2018-12-14 03:26:42,974] {logging_mixin.py:95} INFO - [2018-12-14
03:26:42,973] {spark_submit_hook.py:283} INFO - Spark-Submit cmd:
['spark-submit', '--master', 'yarn', '--executor-cores', '1', '--name',
'airflow-spark', '--queue', 'root.default',
'hadoop@10.70.1.35:/home/hadoop/pyspark_script/test_spark_submit.py']
[2018-12-14 03:26:42,977] {models.py:1760} ERROR - [Errno 2] No such
file or directory: 'spark-submit'
Traceback (most recent call last):
File "/home/dataetl/anaconda3/lib/python3.6/site-
packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/dataetl/anaconda3/lib/python3.6/site-
packages/airflow/contrib/operators/spark_submit_operator.py", line
168,
in execute
self._hook.submit(self._application)
File "/home/dataetl/anaconda3/lib/python3.6/site-
packages/airflow/contrib/hooks/spark_submit_hook.py", line 330, in
submit
**kwargs)
File "/home/dataetl/anaconda3/lib/python3.6/subprocess.py", line
707,
in __init__
restore_signals, start_new_session)
File "/home/dataetl/anaconda3/lib/python3.6/subprocess.py", line
1326, in _execute_child
raise child_exception_type(errno_num, err_msg)
FileNotFoundError: [Errno 2] No such file or directory: 'spark-submit'

这是我的问题:

  1. 我应该在我的 airflow 机器上安装 spark hadoop 吗? 我问是因为在这个 topic我读到我需要复制 hdfs-site.xmlhive-site.xml。但是你可以想象,我的 Airflow 机器上既没有 /etc/hadoop/ 也没有 /etc/hive/ 目录。

  2. a) 如果没有,我应该将 hdfs-site.xmlhive-site.xml 复制到哪里 Airflow 机?

  3. b) 如果是,是否意味着我需要将我的 Airflow 机器配置为客户端?一种不参与作业但可以用来提交 Action 的边缘节点?

  4. 那么,我可以从我的 airflow 机器上 spark-submit 吗? 如果可以,那么我不需要在 Airflow 上创建连接例如,就像我对 mysql 数据库所做的那样,对吧?

  5. 哦,蛋糕上的樱桃:我能否将我的 pyspark 脚本存储在我的 airflow 机器中,并从同一台 airflow 机器spark-submit 它们。这将是惊人的!

任何评论都会非常有用,即使您无法回答我所有的问题...

无论如何提前感谢! :)

最佳答案

回答您的第一个问题,是的,这是一个很好的做法。

关于如何使用SparkSubmitOperator,请引用我在https://stackoverflow.com/a/53344713/5691525上的回答。

  1. ,您需要在 Airflow 机器上使用 spark-binary。
  2. -
  3. No -> 您仍然需要一个连接来告诉 Airflow 您在哪里安装了您的 spark 二进制文件。类似于 https://stackoverflow.com/a/50541640/5691525
  4. 应该工作

关于apache-spark - Airflow SparkSubmitOperator - 如何在另一台服务器中提交 Spark ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53773678/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com