gpt4 book ai didi

callback - Airflow http回调传感器

转载 作者:行者123 更新时间:2023-12-03 22:55:29 27 4
gpt4 key购买 nike

我们的 Airflow 实现发送 http 请求以获取服务来执行任务。我们希望这些服务在完成任务时让 Airflow 知道,因此我们向服务发送回调 url,当他们的任务完成时,他们将调用该服务。但是,我似乎找不到回调传感器。人们通常如何处理这种情况?

最佳答案

Airflow 中没有回调或 webhook 传感器之类的东西。传感器定义如下,取自文档:

Sensors are a certain type of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.



这意味着传感器是执行 的操作符。投票外部系统的行为。从这个意义上说,您的外部服务应该有一种方法来保持每个执行任务的状态 - 无论是内部还是外部 - 以便轮询传感器可以检查该状态。

这样你就可以使用例如 airflow.operators.HttpSensor轮询 HTTP 端点,直到满足条件。或者更好的是,编写您自己的自定义传感器,让您有机会进行更复杂的处理并保持状态。

否则,如果服务在存储系统中输出数据,您可以使用例如轮询数据库的传感器。我相信你明白了。

我附上了一个我为与 Apache Livy API 集成而编写的自定义运算符示例。传感器做两件事:a) 通过 REST API 提交 Spark 作业,b) 等待作业完成。

运算符扩展 SimpleHttpOperator 同时实现了 HttpSensor 从而结合了这两种功能。
class LivyBatchOperator(SimpleHttpOperator):
"""
Submits a new Spark batch job through
the Apache Livy REST API.

"""

template_fields = ('args',)
ui_color = '#f4a460'

@apply_defaults
def __init__(self,
name,
className,
file,
executorMemory='1g',
driverMemory='512m',
driverCores=1,
executorCores=1,
numExecutors=1,
args=[],
conf={},
timeout=120,
http_conn_id='apache_livy',
*arguments, **kwargs):
"""
If xcom_push is True, response of an HTTP request will also
be pushed to an XCom.
"""
super(LivyBatchOperator, self).__init__(
endpoint='batches', *arguments, **kwargs)

self.http_conn_id = http_conn_id
self.method = 'POST'
self.endpoint = 'batches'
self.name = name
self.className = className
self.file = file
self.executorMemory = executorMemory
self.driverMemory = driverMemory
self.driverCores = driverCores
self.executorCores = executorCores
self.numExecutors = numExecutors
self.args = args
self.conf = conf
self.timeout = timeout
self.poke_interval = 10

def execute(self, context):
"""
Executes the task
"""

payload = {
"name": self.name,
"className": self.className,
"executorMemory": self.executorMemory,
"driverMemory": self.driverMemory,
"driverCores": self.driverCores,
"executorCores": self.executorCores,
"numExecutors": self.numExecutors,
"file": self.file,
"args": self.args,
"conf": self.conf
}
print payload
headers = {
'X-Requested-By': 'airflow',
'Content-Type': 'application/json'
}

http = HttpHook(self.method, http_conn_id=self.http_conn_id)

self.log.info("Submitting batch through Apache Livy API")

response = http.run(self.endpoint,
json.dumps(payload),
headers,
self.extra_options)

# parse the JSON response
obj = json.loads(response.content)

# get the new batch Id
self.batch_id = obj['id']

log.info('Batch successfully submitted with Id %s', self.batch_id)

# start polling the batch status
started_at = datetime.utcnow()
while not self.poke(context):
if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
raise AirflowSensorTimeout('Snap. Time is OUT.')

sleep(self.poke_interval)

self.log.info("Batch %s has finished", self.batch_id)

def poke(self, context):
'''
Function that the sensors defined while deriving this class should
override.
'''

http = HttpHook(method='GET', http_conn_id=self.http_conn_id)

self.log.info("Calling Apache Livy API to get batch status")

# call the API endpoint
endpoint = 'batches/' + str(self.batch_id)
response = http.run(endpoint)

# parse the JSON response
obj = json.loads(response.content)

# get the current state of the batch
state = obj['state']

# check the batch state
if (state == 'starting') or (state == 'running'):
# if state is 'starting' or 'running'
# signal a new polling cycle
self.log.info('Batch %s has not finished yet (%s)',
self.batch_id, state)
return False
elif state == 'success':
# if state is 'success' exit
return True
else:
# for all other states
# raise an exception and
# terminate the task
raise AirflowException(
'Batch ' + str(self.batch_id) + ' failed (' + state + ')')

希望这会对你有所帮助。

关于callback - Airflow http回调传感器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51566029/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com