- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们的 Airflow 实现发送 http 请求以获取服务来执行任务。我们希望这些服务在完成任务时让 Airflow 知道,因此我们向服务发送回调 url,当他们的任务完成时,他们将调用该服务。但是,我似乎找不到回调传感器。人们通常如何处理这种情况?
最佳答案
Airflow 中没有回调或 webhook 传感器之类的东西。传感器定义如下,取自文档:
Sensors are a certain type of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.
class LivyBatchOperator(SimpleHttpOperator):
"""
Submits a new Spark batch job through
the Apache Livy REST API.
"""
template_fields = ('args',)
ui_color = '#f4a460'
@apply_defaults
def __init__(self,
name,
className,
file,
executorMemory='1g',
driverMemory='512m',
driverCores=1,
executorCores=1,
numExecutors=1,
args=[],
conf={},
timeout=120,
http_conn_id='apache_livy',
*arguments, **kwargs):
"""
If xcom_push is True, response of an HTTP request will also
be pushed to an XCom.
"""
super(LivyBatchOperator, self).__init__(
endpoint='batches', *arguments, **kwargs)
self.http_conn_id = http_conn_id
self.method = 'POST'
self.endpoint = 'batches'
self.name = name
self.className = className
self.file = file
self.executorMemory = executorMemory
self.driverMemory = driverMemory
self.driverCores = driverCores
self.executorCores = executorCores
self.numExecutors = numExecutors
self.args = args
self.conf = conf
self.timeout = timeout
self.poke_interval = 10
def execute(self, context):
"""
Executes the task
"""
payload = {
"name": self.name,
"className": self.className,
"executorMemory": self.executorMemory,
"driverMemory": self.driverMemory,
"driverCores": self.driverCores,
"executorCores": self.executorCores,
"numExecutors": self.numExecutors,
"file": self.file,
"args": self.args,
"conf": self.conf
}
print payload
headers = {
'X-Requested-By': 'airflow',
'Content-Type': 'application/json'
}
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Submitting batch through Apache Livy API")
response = http.run(self.endpoint,
json.dumps(payload),
headers,
self.extra_options)
# parse the JSON response
obj = json.loads(response.content)
# get the new batch Id
self.batch_id = obj['id']
log.info('Batch successfully submitted with Id %s', self.batch_id)
# start polling the batch status
started_at = datetime.utcnow()
while not self.poke(context):
if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
self.log.info("Batch %s has finished", self.batch_id)
def poke(self, context):
'''
Function that the sensors defined while deriving this class should
override.
'''
http = HttpHook(method='GET', http_conn_id=self.http_conn_id)
self.log.info("Calling Apache Livy API to get batch status")
# call the API endpoint
endpoint = 'batches/' + str(self.batch_id)
response = http.run(endpoint)
# parse the JSON response
obj = json.loads(response.content)
# get the current state of the batch
state = obj['state']
# check the batch state
if (state == 'starting') or (state == 'running'):
# if state is 'starting' or 'running'
# signal a new polling cycle
self.log.info('Batch %s has not finished yet (%s)',
self.batch_id, state)
return False
elif state == 'success':
# if state is 'success' exit
return True
else:
# for all other states
# raise an exception and
# terminate the task
raise AirflowException(
'Batch ' + str(self.batch_id) + ' failed (' + state + ')')
关于callback - Airflow http回调传感器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51566029/
在我的上一个项目中,我使用了 rxJava,我意识到 observable.doOnError('onErrorCallback').subscribe(action) 和 observable.su
我是一名 C++ 初学者,我认为要真正学习指针和引用,我应该尝试创建一个回调函数,这是我在 JavaScript 中认为理所当然的事情。 但是,对于我的一生,我不知道为什么这些括号在 (*callba
我在库中有一个类,它具有在事件发生时执行的“onMessage”方法。 OnMessage 在执行时需要调用属于主应用程序中的类的“回调”方法。我假设这将通过构造函数完成,但我不知道它是如何实现的。
两者的 jQuery 文档基本上说明了相同的事情,所以我想知道两者之间是否有任何重大差异(如果有的话)。谢谢! 最佳答案 这方面的文档实际上非常糟糕,所以这是我在 studying the sourc
这个问题在这里已经有了答案: Using &&'s short-circuiting as an if statement? (6 个答案) Omitting the second expressi
我正在尝试在 golang 中定义一个回调: package main func main() { x, y := "old x ", "old y" callback
我有一个页面,其中包含从 Google 电子表格生成的许多图表。 典型代码如下所示: var url = "http://my.googlespreadsheet.com/tq?argumentshe
当我运行 linter 时,它显示: subscribe is deprecated: Use an observer instead of an error callback 代码来自 this a
对于异步套接字 // accept ... listener.BeginAccept( new AsyncCallback(AcceptCallback), listener); // listene
我希望能够根据在前面的函数中调用的是 callback(true) 还是 callback(false) 在回调函数中执行一些逻辑。 示例: foo.doFunction = function (pa
从 jQuery.scrollTo.js 库中看到这个 block (在 v1.4 的第 184 行)。 function animate( callback ){ $elem.animate
我正在尝试在我的应用中使用一些回调,它与 "callback(value)" 和 "callback.invoke(value)" 一起工作正确调用回调。 我想知道“回调(值)”是否只是一个缩短版本,
我决定从 keras 切换到 tf.keras(建议使用 here)。因此我安装了 tf.__version__=2.0.0和 tf.keras.__version__=2.2.4-tf .在我的旧版
我认为这实际上可能会回答我关于 Stack Overflow 的另一个问题如果我能确认这一点。 返回回调和只调用回调有什么区别? 我看到代码执行其中之一/或/两者,并试图思考为什么以及何时执行哪个。
我目前正在学习 Rust 并希望用它来开发 GUI基于 GTK+ 的应用程序。我的问题与注册回调有关在这些回调中响应 GTK 事件/信号和变异状态。我有一个有效但不优雅的解决方案,所以我想问一下是否有
我在回调函数中传递参数时遇到问题。我使用 redux-form,当我更改 SkinList 中的选择时,它会触发 onChange 回调 - activeSkinChange 方法 在activeSk
我有 8 个相互依赖的回调。我的想法是要有一个更具可读性的过程,但我不明白如何处理这个问题。 我的回调 hell 的一个例子是: return new Promise(function (resolv
因此,我的函数接受一个值和任意数量的回调作为参数(我应该使用扩散操作符吗?)该函数应该返回通过所有给定回调传递该值的最终结果。。我返回的“CB2(Res1)”不是一个函数。如何将第一个回调的结果传递给
在谈到 future 和回调时,documentation说是 The Vert.x core APIs are based on callbacks to notify of asynchronou
我开始觉得自己很蠢。我正在关注 Facebook-Connect 演示“The Run Around”。 当我导航到 http://www.[mysite].com/testing/register_
我是一名优秀的程序员,十分优秀!