- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我已经开始使用 storm,所以我使用 this tutorial 创建了简单的拓扑
当我使用 LocalCluster
运行我的拓扑时,一切看起来都很好,我的问题是我没有在元组上收到 ACK,这意味着我的 spout ack
从未被调用。
我的代码在下面 - 你知道为什么 ack
没有被调用吗?
所以我的拓扑结构是这样的
public StormTopology build() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(HelloWorldSpout.class.getSimpleName(),
helloWorldSpout, spoutParallelism);
HelloWorldBolt bolt = new HelloWorldBolt();
builder.setBolt(HelloWorldBolt.class.getSimpleName(),
bolt, boltParallelism)
.shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}
我的 Spout 是这样的
public class HelloWorldSpout extends BaseRichSpout implements ISpout {
private SpoutOutputCollector collector;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("int"));
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
private static Boolean flag = false;
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
this.collector.emit(new Values(6));
flag = true;
}
}
@Override
public void ack(Object msgId) {
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
public void fail(Object msgId){
System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
}
}
我的 bolt 看起来像这样
@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
logger.info("preparing HelloWorldBolt");
}
public void execute(Tuple tuple) {
System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
this.collector.ack(tuple);
}
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
最佳答案
spout 中的 emit() 方法只有一个参数,因此该元组未被锚定。这就是为什么即使您在 bolt 中确认元组,也不会在 spout 中收到对 ack() 方法的回调。
要使其正常工作,您需要修改 spout 以发出第二个参数,即消息 ID。正是这个 id 传回了 spout 中的 ack() 方法:
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
Object msgId = "ID 6"; // this can be any object
this.collector.emit(new Values(6), msgId);
flag = true;
}
}
@Override
public void ack(Object msgId) {
// msgId should be "ID 6"
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
关于java - Storm Spout 没有得到 Ack,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21260291/
使用普通的 grep 命令,有一个 --exclude 选项(详细介绍如下: Use grep --exclude/--include syntax to not grep through certa
我有 ubuntu 14.04。并使用 ack-grep 2.12 在目录内递归搜索文件中的文本 我想设置 ack-grep 选项 特定于目录 ~/workspace/project/ 这样当我在路径
我们不理解这种 TCP 行为表明 redhat linux 5 TCP 堆栈(HTTP 服务器,这是此转储的来源)收到 SYN 的 ACK,ACK 但继续忽略它并重复重复的 SYN,ACK 5次。最后
我不确定它的用途,但我正在处理的代码有一堆名为“save.d”的文件夹,看起来它们用于某种版本控制(我们也有 . svn 文件夹)。 如何更新我的 .ackrc 文件以默认忽略这些目录? 我的 .ac
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎与 help center 中定义的范围内的编程无关。 . 关闭去年。 Improve th
我已经针对运行在 Ubuntu 14.04.3 LTS (3.13.0-71-generic x86_64) 上的 OpenSSL 1.0.2e 编译了 nginx 1.9.7。我可以毫无问题地使用浏
我有添加新的 ack-grep 文件类型的语法: $ ack-grep --type-add=min=.min.js,.min.css --help types | grep min --[n
我想在任何名为 search-this-one.html 的文件中搜索字符串“my-search-string” 在我当前工作目录的所有子目录中。 我想要包含此字符串的文件的完整路径列表。有数百个名为
我正在使用 ack.pl 工具来搜索文件中的字符串或 IP ack.pl 的官方网站是 - http://beyondgrep.com/documentation/ ack.pl CLI 示例(想在/
如果您正在编写一个基本的 python TCP 服务器和客户端,您需要自己添加 SYN、SYN ACK 和 ACK 响应,还是由套接字模块处理? 如果需要自己写,会这么简单吗? 客户: #set up
[FIN, ACK]、[RST]和[RST, ACK]是什么原因,如何避免? 是否是由于 SO 的 TCP 参数之间存在某种不匹配?服务器在 TCP/IP 连接中回复 [FIN, ACK] 是什么意思
我使用 Vim 插件 ack.vim ,但我不明白为什么命令是带有大写“A”的 :Ack (按住 shift 有点烦人)。 是否可以将其重新映射到 :ack ? 最佳答案 内置命令以小写字符开头,自定
我有一个文件导致 Ack 2.0 报告“权限被拒绝”错误,所以我一直试图让它忽略该文件。 我已经在我的 ackrc 中尝试了以下内容 --type-set=lockfile:match:/NameOf
我有一个文件导致 Ack 2.0 报告“权限被拒绝”错误,所以我一直试图让它忽略该文件。 我已经在我的 ackrc 中尝试了以下内容 --type-set=lockfile:match:/NameOf
我尝试了可能的组合并在互联网上进行了搜索,但无法完成这项工作,错误是: Error: Unexpected ACK received for message-id 如果有人能指出以下代码中是否存在一些
在用 Linux C 编写的 tcp 程序中我想关闭一个 tcp 连接我使用了close(sockfd)我注意到此函数将向另一个对等方发起 FIN/ACK 数据包但是如果另一个对等点由于网络问题或 t
我已经构建了一个 c 函数,该函数应该通过 stomp 库从 ActiveMQ 读取。此代码正在读取收据之前的附加消息(这是我所期望的)。请告知为什么我的 RECEIPT 帧没有在第二个 MESSAG
我正在使用这样的 ack:ack-grep assets\\. (在 Ubuntu 上) 但是,我想排除不以“ Assets ”开头的结果。例如:网站 Assets 。 到目前为止,我已经尝试过 ac
如何使用ack搜索特定文件类型的文件。 例如 ack -f .scss blah acl -f .rb blah 显然上述方法不起作用,但是如何使用ack来做到这一点? 最佳答案 为此使用--type
每次我尝试在ack中添加类型似乎都失败了,即在ack中添加类型(log) ack --type-set log:ext:log ack: No regular expression found. 或者
我是一名优秀的程序员,十分优秀!