gpt4 book ai didi

python - 如何使用 AWS Glue Operator 触发 Glue 作业

转载 作者:行者123 更新时间:2023-12-05 04:57:42 27 4
gpt4 key购买 nike

我的 Airflow 脚本只有一个任务来触发粘合作业。我能够创建 DAG。下面是我的 DAG 代码。

from airflow import DAG
from airflow.operators.email_operator import EmailOperator
from airflow.providers.amazon.aws.operators.glue import AwsGlueJobOperator
from datetime import datetime, timedelta


### glue job specific variables
glue_job_name = "my_glue_job"
glue_iam_role = "AWSGlueServiceRole"
region_name = "us-west-2"
email_recipient = "me@gmail.com"

default_args = {
'owner': 'me',
'start_date': datetime(2020, 1, 1),
'retry_delay': timedelta(minutes=5),
'email': email_recipient,
'email_on_failure': True
}


with DAG(dag_id = 'glue_af_pipeline', default_args = default_args, schedule_interval = None) as dag:

glue_job_step = AwsGlueJobOperator(
job_name =glue_job_name,
script_location = 's3://my-s3-location',
region_name = region_name,
iam_role_name = glue_iam_role,
script_args=None,
num_of_dpus=10,
task_id = 'glue_job_step',
dag = dag
)

glue_job_step

当我运行 DAG 时它失败并给出以下错误:

[2020-10-13 08:27:14,315] {logging_mixin.py:112} INFO - [2020-10-1308:27:14,315] {glue.py:114} ERROR - Failed to run aws glue job, error:Parameter validation failed: Invalid type for parameter Arguments,value: [], type: <class 'list'>, valid types: <class 'dict'>[2020-10-13 08:27:14,315] {taskinstance.py:1058} ERROR - Parametervalidation failed: Invalid type for parameter Arguments, value: [],type: <class 'list'>, valid types: <class 'dict'> Traceback (mostrecent call last): File"/usr/local/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 930, in _run_raw_taskresult = task_copy.execute(context=context) File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/operators/glue.py",line 115, in executeglue_job_run = glue_job.initialize_job(self.script_args) File "/usr/local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/glue.py",line 111, in initialize_jobjob_run = glue_client.start_job_run(JobName=job_name, Arguments=script_arguments) File"/usr/local/lib/python3.8/site-packages/botocore/client.py", line 337,in _api_callreturn self._make_api_call(operation_name, kwargs) File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 628,in _make_api_callrequest_dict = self._convert_to_request_dict( File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 676,in _convert_to_request_dictrequest_dict = self._serializer.serialize_to_request( File "/usr/local/lib/python3.8/site-packages/botocore/validate.py", line297, in serialize_to_requestraise ParamValidationError(report=report.generate_report()) botocore.exceptions.ParamValidationError: Parameter validation failed:Invalid type for parameter Arguments, value: [], type: <class 'list'>,valid types: <class 'dict'> [2020-10-13 08:27:14,316]{taskinstance.py:1089} INFO - Marking task as FAILED.

任何建议表示赞赏。

最佳答案

如果您正在运行现有的 GlueJob 试试这个,

glue_job_step = AwsGlueJobOperator(
task_id = "glue_job_step",
job_name = glue_job_name,
job_desc = f"triggering glue job {glue_job_name}",
region_name = region_name,
iam_role_name = glue_iam_role,
num_of_dpus = 1,
dag = dag
)

如果没有输入参数,则移除 script_args

关于python - 如何使用 AWS Glue Operator 触发 Glue 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64350707/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com