gpt4 book ai didi

Airflow worker 卡住 : Task is in the 'running' state which is not a valid state for execution. 必须清除任务才能运行

转载 作者:行者123 更新时间:2023-12-01 01:41:06 26 4
gpt4 key购买 nike

Airflow 任务在没有任何问题的情况下运行,突然卡住一半,任务实例详细信息在消息上方显示。

我清除了整个数据库,但仍然遇到相同的错误。

事实是我只因为一些 dag 就遇到了这个问题。主要是在长时间运行的作业时。

我得到以下错误

[2019-07-03 12:14:56,337] {{models.py:1353}} INFO - Dependencies not met for <TaskInstance: XXXXXX.index_to_es 2019-07-01T13:30:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-07-03 12:14:56,341] {{models.py:1353}} INFO - Dependencies not met for <TaskInstance: XXXXXX.index_to_es 2019-07-01T13:30:00+00:00 [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2019-07-03 05:58:51.601552+00:00.
[2019-07-03 12:14:56,342] {{logging_mixin.py:95}} INFO - [2019-07-03 12:14:56,342] {{jobs.py:2514}} INFO - Task is not able to be run

我的 dag 如下所示
default_args = {
'owner': 'datascience',
'depends_on_past': True,
'start_date': datetime(2019, 6, 12),
'email': ['datascience@mycompany.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
# 'queue': 'nill',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
def get_index_date(**kwargs):
tomorrow=kwargs.get('templates_dict').get('tomorrow')
return str(tomorrow).replace('-','.')

"""
Create Dags specify its features
"""
dag = DAG(
DAG_NAME,
schedule_interval="0 9 * * *",
catchup=True,
default_args=default_args,
template_searchpath='/efs/sql')

create_table = BigQueryOperator(
dag=dag,
task_id='create_temp_table_from_query',
sql='daily_demand.sql',
use_legacy_sql=False,
destination_dataset_table=TEMP_TABLE,
bigquery_conn_id=CONNECTION_ID,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE'
)

"""Task to zip and export to GCS"""
export_to_storage = BigQueryToCloudStorageOperator(
task_id='export_to_GCS',
source_project_dataset_table=TEMP_TABLE,
destination_cloud_storage_uris=[CLOUD_STORAGE_URI],
export_format='NEWLINE_DELIMITED_JSON',
compression='GZIP',
bigquery_conn_id=CONNECTION_ID,
dag=dag)
"""Task to get the tomorrow execution date formatted for indexing"""
get_index_date = PythonOperator(
task_id='get_index_date',
python_callable=get_index_date,
templates_dict={'tomorrow':"{{ tomorrow_ds }}"},
provide_context=True,
dag=dag
)
"""Task to download zipped files and bulkindex to elasticsearch"""
es_indexing = EsDownloadAndIndexOperator(
task_id="index_to_es",
object=OBJECT,
es_url=ES_URI,
local_path=LOCAL_FILE,
gcs_conn_id=CONNECTION_ID,
bucket=GCS_BUCKET_ID,
es_index_type='demand_shopper',
es_bulk_batch=5000,
es_index_name=INDEX,
es_request_timeout=300,
dag=dag)


"""Define the chronology of tasks in DAG"""
create_table >> export_to_storage >> get_index_date >> es_indexing

谢谢你的帮助

最佳答案

我找到了问题所在,这是底层基础架构的问题。我正在使用 AWS EFS,并且突发模式在达到吞吐量时阻塞了工作线程。更改为配置模式,工作人员不再处于卡住状态。
我的想法来自
ecs-airflow-1-10-2-performance-issues-operators-and-tasks-take-10x-longer

关于 Airflow worker 卡住 : Task is in the 'running' state which is not a valid state for execution. 必须清除任务才能运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56878366/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com