gpt4 book ai didi

clojure - Storm > 如何将 Java 回调集成到 Spout 中

转载 作者:行者123 更新时间:2023-12-04 20:47:04 25 4
gpt4 key购买 nike

我正在尝试将 Storm ( see here ) 集成到我的项目中。我理解了拓扑、spout 和 bolts 的概念。但现在,我试图弄清楚一些事情的实际实现。

一)我有一个使用 Java 和 Clojure 的多语言环境。我的 Java 代码是一个回调类,其中包含触发流数据的方法。推送到这些方法的事件数据是我想用作 spout 的。

所以第一个问题是如何将进入这些方法的数据连接到一个 spout ?我正在尝试 i) 传递一个 backtype.storm.topology.IRichSpout ,然后 ii) 将 backtype.storm.spout.SpoutOutputCollector ( see here ) 传递给该 spout 的 打开函数(see here)。但我看不到实际传递任何类型的 map 或列表的方法。

B) 我项目的其余部分都是 Clojure。通过这些方法将有大量数据。每个事件的 ID 介于 1 和 100 之间。在 Clojure 中,我希望将来自 spout 的数据拆分到不同的执行线程中。我认为,这些将是 bolt 。

如何设置 Clojure bolt 从 spout 获取事件数据,然后根据传入事件的 ID 中断线程?

提前致谢
蒂姆

[编辑 1]

我实际上已经解决了这个问题。我结束了 1) 实现我自己的 IRichSpout。我然后 2) 将该 spout 的内部元组连接到我的 java 回调类中的传入流数据。我不确定这是否是惯用的。但它编译并运行没有错误。但是, 3) 我没有看到通过 传入的流数据(肯定存在)打印品 bolt 。

为了确保事件数据得到传播,在 spout 或 bolt 实现或拓扑定义中是否需要做一些特定的事情?谢谢。

;;将 Java 回调绑定(bind)到我创建的 Spout
(.setSpout java-callback ibspout)

(storm/defbolt printstuff ["word"] [元组收集器]
(println (str "printstuff --> tuple["tuple"] > collector["collector"]"))
)
( Storm /拓扑
{ "1"(storm/spout-spec ibspout)
}
{ "3"(storm/bolt-spec { "1":shuffle }
打印品
)
})

[编辑 2]

根据 SO 成员 Ankur 的建议,我正在重新调整我的拓扑。创建 Java 回调后,我将它的元组传递给下面的 IBSpout,使用 (.setTuple ibspout (.getTuple java-callback)) .我没有传递整个 Java 回调对象,因为我得到了 NotSerializable 错误。一切都编译并运行没有错误。但同样,我的 没有数据。打印品 bolt 。嗯。

公共(public)类 IBSpout 实现 IRichSpout {

/**
* Storm 喷口的东西
*/
私有(private) SpoutOutputCollector _collector;

私有(private)列表 _tuple = new ArrayList();
public void setTuple(List tuple) { _tuple = tuple; }
公共(public)列表 getTuple() { return _tuple; }

/**
* Storm ISpout 接口(interface)函数
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector 收集器) {
_collector = 收集器;
}
公共(public)无效关闭(){}
公共(public)无效激活(){}
公共(public)无效停用(){}
公共(public)无效 nextTuple() {
_collector.emit(_tuple);
}
公共(public)无效确认(对象 msgId){}
公共(public)无效失败(对象 msgId){}

public void declareOutputFields(OutputFieldsDeclarer 声明者) {}
public java.util.Map getComponentConfiguration() { return new HashMap(); }

}

最佳答案

似乎您正在将 spout 传递给您的回调类,这似乎有点奇怪。当一个拓扑被执行时,storm 会周期性地调用 spouts nextTuple方法,因此您需要做的是将 java 回调传递给您的自定义 spout 实现,以便当storm 调用您的spout 时,spout 调用 java 回调以获取下一组要馈送到拓扑中的元组。

要理解的关键概念是 Spouts 拉动 Storm 请求时的数据,您不要推数据到 spouts。您的回调不能调用 spout 将数据推送给它,而是当您的 spout 的 nextTuple 时,您的 spout 应该提取数据(从一些 java 方法或任何内存缓冲区)方法被调用。

关于clojure - Storm > 如何将 Java 回调集成到 Spout 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15770650/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com