gpt4 book ai didi

google-cloud-platform - 如何在 Vertex AI 中安排自定义训练作业的重复运行

转载 作者:行者123 更新时间:2023-12-05 04:46:27 27 4
gpt4 key购买 nike

我已将我的训练代码打包为 python 包,然后能够将其作为自定义训练作业在 Vertex AI 上运行。现在,我希望能够安排这项工作运行,比如每 2 周运行一次,然后重新训练模型。 CustomJoBSpec 中的调度设置只允许 2 个字段,“超时”和“restartJobOnWorkerRestart”,因此无法使用 CustomJobSpec 中的调度设置。我能想到的实现这一目标的一种方法是使用“CustomPythonPackageTrainingJobRunOp”Google Cloud Pipeline Component 创建一个 Vertex AI 管道,然后按我认为合适的方式安排管道运行。是否有更好的替代方案来实现这一目标?

编辑:

我能够使用 Cloud Scheduler 安排自定义训练作业,但我发现在 Vertex AI 管道中使用 AIPlatformClient 中的 create_schedule_from_job_spec 方法非常容易使用。我在 gcp 中使用 Cloud Scheduler 安排自定义作业的步骤如下,link谷歌文档:

  1. 将目标类型设置为 HTTP
  2. 对于指定自定义作业的url,我遵循了this获取url的链接
  3. 对于身份验证,在 Auth header 下,我选择了“添加 OAauth token ”

您还需要在您的项目中拥有一个“Cloud Scheduler 服务帐户”以及“授予它的 Cloud Scheduler 服务代理角色”。尽管文档指出,如果您在 2019 年 3 月 19 日之后启用 Cloud Scheduler API,这应该会自动设置,但我的情况并非如此,必须手动添加具有该角色的服务帐户。

最佳答案

根据您的要求,各种可能的安排方式:

<强>1。云 Composer

Cloud Composer是一个托管的 Apache Airflow,可帮助您创建、安排、监控和管理工作流。

您可以按照下面提到的步骤使用 Composer 每两周安排一次工作:

  • 创建 Composer 环境。
  • 写一个 DAG 文件并将您的自定义训练 python 代码添加到 DAG 文件。
  • 由于自定义训练作业是 python 代码,PythonOperator可用于安排任务。
  • 在 DAG 文件中,您需要提供开始时间,即计划开始的时间,并且您需要将计划间隔定义为两周,如下所示:
with models.DAG(
'composer_sample_bq_notify',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:

或者,您也可以使用 unix-cron字符串格式 (* * * * *) 来进行调度。

即在您每两周安排一次的情况下,cron 格式将类似于:* * 1,15 * *

您可以使用 op_args and op_kwargs arguments 在 PythonOperator 中传递自定义作业所需的参数.

DAG文件写入后,需要将其推送到Composer Environment Bucket中的dags/文件夹。

您可以在 Airflow UI 中查看计划 DAG 的状态。

预定的 DAG 文件如下所示:

sample_dag.py :

from __future__ import print_function

import datetime

from google.cloud import aiplatform

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)


default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': YESTERDAY,
}

with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:

def create_custom_job_sample(
project: str,
display_name: str,
container_image_uri: str,
location: str,
api_endpoint: str,
):
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(client_options=client_options)
custom_job = {
"display_name": display_name,
"job_spec": {
"worker_pool_specs": [
{
"machine_spec": {
"machine_type": "n1-standard-4",
"accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
"accelerator_count": 1,
},
"replica_count": 1,
"container_spec": {
"image_uri": container_image_uri,
"command": [],
"args": [],
},
}
]
},
}
parent = f"projects/{project}/locations/{location}"
response = client.create_custom_job(parent=parent, custom_job=custom_job)
print("response:", response)

hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=create_custom_job_sample,
op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
)

# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='job scheduled')

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

<强>2。云调度程序:使用 Cloud Scheduler 安排工作您将需要进行以下配置:

  • 目标t : HTTP
  • URL:作业的端点 URL(示例:“us-central1-aiplatform.googleapis.com”)
  • Auth header :用于托管在 *.googleapis.com 上的 Google API 的 OAuth token

3. Scheduling a recurring pipeline使用 Kubeflow Pipelines SDK 运行:

您可以使用 Python 和 Kubeflow Pipelines SDK 安排定期运行的管道。

from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID,
region=REGION)

api_client.create_schedule_from_job_spec(
job_spec_path=COMPILED_PIPELINE_PATH,
schedule=* * 1,15 * *,
time_zone=TIME_ZONE,
parameter_values=PIPELINE_PARAMETERS
)

关于google-cloud-platform - 如何在 Vertex AI 中安排自定义训练作业的重复运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68793294/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com