我目前设置了一个拓扑,该拓扑使用 GitHub 上 tomdz 创建的 Esper Bolt。除了元组锚定之外,一切似乎都工作正常。
Esper Bolt 本身有一个回调:
public void update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider)
这里的问题是我没有对元组树中的前一个元组的引用。这意味着当我将 Esper 结果发送到序列中的下一个 Bolt 时,我无法提供用于锚定的元组:
collector.emit(new Values(eventName, eventGrouping, eventDescription, correlatedValues));
想知道是否有人在自己的项目中遇到过这个问题?如果是这样,你是如何解决这个问题的?我想使用锚定来确保整个拓扑中的消息可靠性。
一种选择是将 Storm 元组作为属性附加到 Esper 输入事件,并确保 Esper EPL 通过“select *”或“select origintuple”选择该属性。然后听众就可以使用它。
另一个选项是使用您可能拥有的某些事件 ID 在 Esper 之外跟踪此事件。或者通过将输入事件映射到元组的身份 HashMap 来使用输入事件身份。然后,监听器需要根据 Esper 输入事件进行一些查找,该事件与 Esper 也传递给监听器的引用相同,然后删除。
我是一名优秀的程序员,十分优秀!