- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试运行保证消息处理的 WordCount 示例。
只有一个喷口
和两个 bolt
SplitSentence - 在单词中拆分句子并使用锚定发出
WordCount - 打印字数。
我想用下面的代码实现的是,当所有单词都算作一个句子时。必须承认与该句子对应的 Spout。
我最后确认了 _collector.ack(tuple) 仅 WordCount。我看到奇怪的是尽管 ack() 在 WordCount.execute() 被调用,相应的 WSpout.ack() 没有得到叫。它总是在默认超时后失败。
我真的不明白代码有什么问题。请帮我理解这个问题。任何帮助表示赞赏。
下面是完整的代码。
public class TestTopology {
public static class WSpout implements IRichSpout {
SpoutOutputCollector _collector;
Integer msgID = 0;
@Override
public void nextTuple() {
Random _rand = new Random();
String[] sentences = new String[] { "There two things benefit",
" from Storms reliability capabilities",
"Specifying a link in the",
" tuple tree is " + "called anchoring",
" Anchoring is done at ",
"the same time you emit a " + "new tuple" };
String message = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(message), msgID);
System.out.println(msgID + " " + message);
msgID++;
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
System.out.println("open");
_collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("LINE"));
}
@Override
public void ack(Object msgID) {
System.out.println("ack ------------------- " + msgID);
}
@Override
public void fail(Object msgID) {
System.out.println("fail ----------------- " + msgID);
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void close() {
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
public static class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
System.out.println(word);
_collector.emit(tuple, new Values(word));
}
//_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("WordCount MSGID : " + tuple.getMessageId());
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
System.out.println(word + " ===> " + count);
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new WSpout(), 2);
builder.setBolt("split", new SplitSentence(), 2).shuffleGrouping(
"spout");
builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split",
new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
最佳答案
WordCount 扩展了 BaseBasicBolt,它确保元组在那个 bolt 中自动确认,就像您在评论中所说的那样。但是,SplitSentence 扩展了 BaseRichBolt,这需要您手动确认元组。你没有确认,所以元组超时。
关于java - 带保证消息处理的 WordCount,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23114487/
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎不是关于 a specific programming problem, a softwar
有没有办法保证您的系统托盘图标被删除? 添加系统托盘图标: Shell_NotifyIcon(NIM_ADD, &m_tnd); 删除系统托盘图标: Shell_NotifyIcon(NIM_DELE
是否保证(-x) % m,其中x和m在c++中为正standard (c++0x) 为负数,等于 -(x % m)? 我知道它在我知道的所有机器上都是正确的。 最佳答案 除了Luchian的回答,这是
可能还有其他方法可以作为示例,但这不是我要问的重点。 我正在这样做: (future (clojure.java.shell/sh "sleep" "3" :dir "/tmp")) 启动对Shell
可以使用 XREAD(或者可能是另一个命令)以原子方式检测数据是否写入 Redis 流? 进一步来说: 假设您在一个进程中将一些数据添加到 Redis 流中,并看到数据已通过某个自动生成的 key 成
Kotlin 协程是否提供任何“发生之前”保证? 例如,在这种情况下,写入 mutableVar 和随后在(可能)其他线程上读取之间是否存在“发生之前”保证: suspend fun doSometh
我正在开发一个跟踪行程的应用程序。在搜索了这件事之后,我得出结论,实现这一点(持续跟踪用户的位置)的最好方法是使用前台服务。在某些情况下工作得很好,但在其他一些情况下(即使关闭 DOZE),我得到一些
我正在使用 ORM (sqlalchemy) 从 PG 数据库中获取数据。我想避免在我手工编写的 SQL 语句中指定所有表列名称*。 到目前为止,我的假设是返回的列按照用于创建数据库表的 DDL 语句
在 setState 的文档中这样说: setState() does not immediately mutate this.state but creates a pending state tr
我有一个与不同硬件接口(interface)的简单应用程序。对于每个硬件,我针对一个独特的监视器函数生成了一个 pthread_t,总共有 6 个线程:1 个管理线程和 5 个工作线程。 每个线程都有
目前,我有 private ThreadLocal shortDateFormat = new ThreadLocal() { @Override protected DateFormat i
我有一个使用 SolrCloud 将文档写入 Solr 的 Java 作业。输入数据被转换为不同实体的映射,然后将每个实体写入与其实体类型对应的 Solr 集合。 我的代码如下: public voi
我们使用嵌入式设备通过串行到以太网转换器将数据包从串行端口发送到服务器。我们使用的一家制造商 Moxa 将始终以与构建它们相同的方式发送数据包。意思是,如果我们构建一个大小为 255 的数据包,它将始
我是从 C++ 转到 Java 的。在 C++ 世界中,我们关注异常安全,并注意到变元器可以在变元器本身或其委托(delegate)的方法抛出异常时提供不同的保证(最小、强、不抛出)。实现具有强异常保
我想将来自 SAAJ 的 SOAPConnectionFactory 和 MessageFactory 类与多个线程一起使用,但事实证明我不能假设它们是线程安全的。一些相关的帖子: javax.xml
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 5 年前。 Improve
关于正确性,我找不到以下代码片段没有设计缺陷的证据/反证据。 template class MyDirtyPool { public: template std::size_t ad
对于这个问题,我找到了不同的答案,我知道一定有一个确定的答案。 C 中四种主要数据类型的最小分配内存大小是多少? int , double , float , 和 char是我在想什么。做 signe
我正在使用 Kafka Producer,我的应用程序将具有相同键的各个 ProducerRecords 发送到单个分区中,然后这些 ProducerRecords 在发送到代理之前进行批处理(使用
您好,我是服务器端编程 (java) 的新手,正在阅读 SendRedirect 与 Forward 之间的区别。来自 Post-redirect-get pattern它解释说这将阻止通过点击刷新按
我是一名优秀的程序员,十分优秀!