gpt4 book ai didi

python - 如何在Airflow中的PythonOperator的python_callable中提供异步函数?

转载 作者:行者123 更新时间:2023-12-02 02:46:45 29 4
gpt4 key购买 nike

我有管道中的任务要执行,但这些任务通常是异步的。我正在尝试使用 Airflow 运行管道,但它给了我错误。“类型错误:无法腌制协程对象”

由于函数是异步的,我想将它们包装在“asyncio.run”中,但仍然不起作用。

class Top:
async def process(self, input_data):
return [rawstr for rawstr in input_data]

class Bottom:
async def process(self, input_data):
return [len(x) for x in input_data]

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 7, 25),
'retries': 1,
'provide_context': True,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('sof_dag', default_args=default_args, schedule_interval=timedelta(days=1))

async def top_1(x, **kwargs):
return asyncio.run(Top().process(x))

async def bottom_1(**kwargs):
ti = kwargs['ti']
y = ti.xcom_pull(key=None, task_ids='Router_1')
return asyncio.run((Bottom().process(y)))

t1 = PythonOperator(
task_id='task_top_1',
python_callable=top_1,
op_args=[["wow! this is great", "this is not how I thought"]],
dag=dag)

t2 = PythonOperator(
task_id='task_bottom_1',
python_callable=bottom_1,
dag=dag)

t1 >> t2

这只是一个虚拟场景,旨在让您了解如何在我的几乎每个任务中使用异步。这是错误跟踪:

Traceback (most recent call last):
File "/Users/divyanshushekhar/repos/repo_name/venv/lib/python3.7/site-packages/airflow/models/__init__.py", line 1445, in _run_raw_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/Users/divyanshushekhar/repos/repo_name/venv/lib/python3.7/site-packages/airflow/models/__init__.py", line 1867, in xcom_push
execution_date=execution_date or self.execution_date)
File "/Users/divyanshushekhar/repos/repo_name/venv/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
return func(*args, **kwargs)
File "/Users/divyanshushekhar/repos/repo_name/venv/lib/python3.7/site-packages/airflow/models/__init__.py", line 4460, in set
value = pickle.dumps(value)
TypeError: can't pickle coroutine objects
[2019-08-08 18:37:09,630] {__init__.py:1603} INFO - Marking task as UP_FOR_RETRY

最佳答案

您可以通过调用一个调用 asyncio.run() 的函数来绕过它,不要返回协程对象

async def request_data(**kwargs):

# maybe loop & create tasks then offload them other async functions

async with session.get() as resp:

data = await resp.text

return data


def task_callable(**kwargs):

cred = kwargs['params']['credentials']
asyncio.run(request_data(session_cred=cred))


default_args = {
owner:'yourname'
}

with DAG('my_dag', default_args=default_args, schedule = '@daily') as dag:

task = PythonOperator(
task_id = 'async_task',
python_callable = task_callable,
provide_context = True,
params = {
'credentials':get_cred(key_file)
}
)

关于python - 如何在Airflow中的PythonOperator的python_callable中提供异步函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57423101/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com