gpt4 book ai didi

ssh - 如何在代码中动态更新现有 Airflow(1.9 版)连接的参数?

转载 作者:行者123 更新时间:2023-12-02 14:01:00 25 4
gpt4 key购买 nike

我已经通过 Airflow 管理界面定义了一个 SSH 连接。但是,我只在 UI 中定义了一个服务帐户、主机和端口。我正在第一个任务实例中检索密码,我需要在第二个任务实例中使用密码更新 SSH 连接,并在第三个任务实例中使用它。

  • t1:调用 R 函数来检索 svc 帐户的密码(存储在 xcom_push 中)
  • t2 :使用此密码更新 SSH 连接(我正在使用SSHHook) ssh02.password = 密码(通过 xcom_pull 获取)
  • t3 : 使用先前更新的连接 (ssh02) 调用服务器

目前 t1 和 t2 按预期工作,但是 t3 失败,因为密码没有更新并且它正在寻找基于 .ssh key 文件的身份验证。有人可以建议如何实现吗?

这是我的代码片段:

    from airflow import models
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.models import Variable
from airflow.models import Connection
from airflow.settings import Session
from airflow.utils import db
from airflow.utils.db import provide_session
from airflow import DAG
import logging
import os

svcpassword = 'XXXX'

logging.getLogger().setLevel(logging.DEBUG)


ssh01 = SSHHook(ssh_conn_id='ssh_conn1')
ssh02 = SSHHook(ssh_conn_id='ssh_conn2')


default_args = {
'owner': 'user',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['abcd@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay':timedelta(minutes=1)
}


dag = DAG('dag_POC', default_args=default_args,
schedule_interval="@once")

path1 = '/home/user/R_samplescript'


t1 = SSHOperator(
task_id='SSHTask',
command='Rscript '+path1+'.R',
ssh_hook=ssh01,
params={},retries =1 ,
do_xcom_push = True,
dag = dag
)

def create_new_connection(**kwargs):
ti = kwargs['ti']
pwd = ti.xcom_pull(task_ids='SSHTask')
password = str(pwd).replace("\\n","\n")
password = password[password.find(' ')+1 : ]
password = password.strip()
svcpassword = password
db.merge_conn( models.Connection(
conn_id='ssh_conn2', conn_type='SSH',
host='server_name', port='XXXX',login =
'account_name',password = svcpassword))


t2 = PythonOperator(
task_id='Create_Connection',
python_callable=create_new_connection,
provide_context=True,
dag=dag
)


t3 = SSHOperator(
task_id='RemoteCallTest',
command="R command",
ssh_hook = SSHHook().get_conn('ssh_conn2'),
do_xcom_push = False,
retries = 1,
dag=dag
)



t1 >> t2 >> t3

最佳答案

您需要利用 session 包装器来持久化对数据库的更改

@provide_session()
def set_password(session=None):
conn = MyHook().get_conn(conn_id)
conn.set_password(my_password)

session.add(conn)
session.commit()

关于ssh - 如何在代码中动态更新现有 Airflow(1.9 版)连接的参数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51204342/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com