gpt4 book ai didi

python - 在 Airflow 上使用 DataprocOperator 的组件网关

转载 作者:行者123 更新时间:2023-12-01 00:09:50 27 4
gpt4 key购买 nike

在 GCP 中,安装和运行 JupyterHub component 相当简单。从用户界面或 gcloud 命令。我正在尝试通过 Airflow 和 DataprocClusterCreateOperator 编写进程脚本,这里是 DAG 的摘录

from airflow.contrib.operators import dataproc_operator  

create_cluster=dataproc_operator.DataprocClusterCreateOperator(
task_id='create-' + CLUSTER_NAME,
cluster_name=CLUSTER_NAME,
project_id=PROJECT_ID,
num_workers=3,
num_masters=1,
master_machine_type='n1-standard-2',
worker_machine_type='n1-standard-2',
master_disk_size=100,
worker_disk_size=100,
storage_bucket='test-dataproc-jupyter',
region='europe-west4',
zone='europe-west4-a',
auto_delete_ttl=21600,
optional_components=['JUPYTER', 'ANACONDA']
)

但是我无法指定所需的 enable-component-gateway范围。查看源代码,似乎参数不是有意的(在 deprecatedlast stable 运算符中)。

我知道 REST API 提供了 endpointConfig.enableHttpPortAccess ,但我更愿意使用官方运营商。
有谁知道如何实现这一目标?

最佳答案

编辑,一个适用于 composer-1.8.3 的修复,带有 Airflow 1.10.3

在 Airflow 1.10.3 中,无法在外部创建集群配置。但是我们可以继承集群创建操作符并覆盖配置创建。这也让我们可以设置可选组件,这是 Airflow 版本中缺少的一个参数。

class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

def __init__(self, *args, **kwargs):
super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

def _build_cluster_data(self):
cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
cluster_data['config']['endpointConfig'] = {
'enableHttpPortAccess': True
}
cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
return cluster_data

#Start DataProc Cluster
dataproc_cluster = CustomDataprocClusterCreateOperator(
task_id='create-' + CLUSTER_NAME,
cluster_name=CLUSTER_NAME,
project_id=PROJECT_ID,
num_workers=3,
num_masters=1,
master_machine_type='n1-standard-2',
worker_machine_type='n1-standard-2',
master_disk_size=100,
worker_disk_size=100,
storage_bucket='test-dataproc-jupyter',
region='europe-west4',
zone='europe-west4-a',
auto_delete_ttl=21600,
dag=dag
)

原始答案,适用于 Airflow 1.10.7

虽然不是最优的,但您可以自己创建 Cluster 数据结构,而不是让 Airflow 的 ClusterGenerator 来这样做。它应该适用于最新版本(1.10.7)

cluster = {
'clusterName': CLUSTER_NAME,
'config': {
'gceClusterConfig': {
'zoneUri': 'europe-west4-a'
},
'masterConfig': {
'numInstances': 1,
'machineTypeUri': 'n1-standard-2',
'diskConfig': {
'bootDiskSizeGb': 100
},
},
'workerConfig': {
'numInstances': 3,
'machineTypeUri': 'n1-standard-2',
'diskConfig': {
'bootDiskSizeGb': 100
},
},
'softwareConfig': {
'optionalComponents': [
'ANACONDA',
'JUPYTER'
]
},
'lifestyleConfig': {
'autoDeleteTtl': 21600
},
'endpointConfig': {
'enableHttpPortAccess': True
}
},
'projectId': PROJECT_ID
}
#Start DataProc Cluster
dataproc_cluster = DataprocClusterCreateOperator(
task_id='create-' + CLUSTER_NAME,
project_id=PROJECT_ID,
num_workers=3,
region='europe-west4',
zone='europe-west4-a',
cluster = cluster,
dag=DAG
)

如果您使用的是其他 Airflow 版本,请指明。

您也可以为我打开的错误投票: AIRFLOW-6432

关于python - 在 Airflow 上使用 DataprocOperator 的组件网关,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59568043/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com