- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在尝试向我的本地 Storm 集群提交一个简单的字数统计拓扑。首先,我尝试使用 maven,然后使用 storm 命令行客户端。我使用 eclipse 创建了 JAR 文件。但是,它抛出主类未找到异常。谁能告诉我可能是什么问题?我在下面附上代码和异常。
package com.test.newpackage;
import com.test.newpackage.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.test.newpackage.WordCounter;
import com.test.newpackage.WordNormalizer;
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
// Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(), 2).fieldsGrouping(
"word-normalizer", new Fields("word"));
// Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
// Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf,
builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}
package com.test.newpackage;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class WordCounter implements IRichBolt {
Integer id;
String name;
Map<String, Integer> counters;
private OutputCollector collector;
/**
* At the end of the spout (when the cluster is shutdown We will show the
* word counters
*/
@Override
public void cleanup() {
System.out.println("-- Word Counter [" + name + "-" + id + "] --");
for (Map.Entry<String, Integer> entry : counters.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
}
/**
* On each word We will count
*/
@Override
public void execute(Tuple input) {
String str = input.getString(0);
/**
* If the word dosn't exist in the map we will create this, if not We
* will add 1
*/
if (!counters.containsKey(str)) {
counters.put(str, 1);
} else {
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
// Set the tuple as Acknowledge
collector.ack(input);
}
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
package com.test.newpackage;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt {
private OutputCollector collector;
public void cleanup() {
}
/**
* The bolt will receive the line from the words file and process it to
* Normalize this line
*
* The normalize will be put the words in lower case and split the line to
* get all words in this
*/
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for (String word : words) {
word = word.trim();
if (!word.isEmpty()) {
word = word.toLowerCase();
// Emit the word
List a = new ArrayList();
a.add(input);
collector.emit(a, new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
/**
* The bolt will only emit the field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
package com.test.newpackage;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader implements IRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;
public boolean isDistributed() {
return false;
}
public void ack(Object msgId) {
System.out.println("OK:" + msgId);
}
public void close() {
}
public void fail(Object msgId) {
System.out.println("FAIL:" + msgId);
}
/**
* The only thing that the methods will do It is emit each file line
*/
public void nextTuple() {
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Do nothing
}
return;
}
String str;
// Open the reader
BufferedReader reader = new BufferedReader(fileReader);
try {
// Read all lines
while ((str = reader.readLine()) != null) {
/**
* By each line emmit a new value with the line as a their
*/
this.collector.emit(new Values(str), str);
}
} catch (Exception e) {
throw new RuntimeException("Error reading tuple", e);
} finally {
completed = true;
}
}
/**
* We will create the file and get the collector object
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file["
+ conf.get("wordFile") + "]");
}
this.collector = collector;
}
/**
* Declare the output field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="resources/
words.txt"
storm jar TopologyMain.jar TopologyMain wordcount
Exception in thread "main" java.lang.NoClassDefFoundError: TopologyMain
Caused by: java.lang.ClassNotFoundException: TopologyMain
at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
Could not find the main class: TopologyMain. Program will exit.
编辑异常:
backtype.storm.daemon.nimbus - Activating word-count: word-count-1-1374756437
1724 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread Thread[main,5,main] died
java.lang.NullPointerException
at clojure.lang.Numbers.ops(Numbers.java:942)
at clojure.lang.Numbers.isPos(Numbers.java:94)
at clojure.core$take$fn__4112.invoke(core.clj:2500)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$concat$fn__3804.invoke(core.clj:662)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$map$fn__4091.invoke(core.clj:2437)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
at clojure.core$reduce.invoke(core.clj:6030)
at clojure.core$into.invoke(core.clj:6077)
at backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245)
at backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408)
at backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420)
at backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315)
at backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
at clojure.core$reduce.invoke(core.clj:6030)
at clojure.core$into.invoke(core.clj:6077)
at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:625)
at clojure.lang.RestFn.invoke(RestFn.java:410)
at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto__$reify__3603.submitTopology(nimbus.clj:898)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:616)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
at backtype.storm.testing$submit_local_topology.invoke(testing.clj:227)
at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:19)
at backtype.storm.LocalCluster.submitTopology(Unknown Source)
at storm.starter.WordCountTopology.main(WordCountTopology.java:83)
11769 [Thread-2] ERROR backtype.storm.daemon.nimbus - Error when processing event
java.lang.NullPointerException
at clojure.lang.Numbers.ops(Numbers.java:942)
at clojure.lang.Numbers.isPos(Numbers.java:94)
at clojure.core$take$fn__4112.invoke(core.clj:2500)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$concat$fn__3804.invoke(core.clj:662)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$concat$cat__3806$fn__3807.invoke(core.clj:671)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core$map$fn__4091.invoke(core.clj:2437)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
at clojure.core$reduce.invoke(core.clj:6030)
at clojure.core$into.invoke(core.clj:6077)
at backtype.storm.daemon.common$storm_task_info.invoke(common.clj:245)
at backtype.storm.daemon.nimbus$compute_executors.invoke(nimbus.clj:408)
at backtype.storm.daemon.nimbus$compute_executor__GT_component.invoke(nimbus.clj:420)
at backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:315)
at backtype.storm.daemon.nimbus$mk_assignments$iter__3398__3402$fn__3403.invoke(nimbus.clj:626)
at clojure.lang.LazySeq.sval(LazySeq.java:42)
at clojure.lang.LazySeq.seq(LazySeq.java:60)
at clojure.lang.RT.seq(RT.java:473)
at clojure.core$seq.invoke(core.clj:133)
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
at clojure.core$reduce.invoke(core.clj:6030)
at clojure.core$into.invoke(core.clj:6077)
at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:625)
at clojure.lang.RestFn.invoke(RestFn.java:410)
at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596$fn__3597.invoke(nimbus.clj:860)
at backtype.storm.daemon.nimbus$fn__3590$exec_fn__1207__auto____3591$fn__3596.invoke(nimbus.clj:859)
at backtype.storm.timer$schedule_recurring$this__1753.invoke(timer.clj:69)
at backtype.storm.timer$mk_timer$fn__1736$fn__1737.invoke(timer.clj:33)
at backtype.storm.timer$mk_timer$fn__1736.invoke(timer.clj:26)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:679)
11774 [Thread-2] INFO backtype.storm.util - Halting process: ("Error when processing an event")
最佳答案
Naresh,根据我所见,您的问题的解决方案可能在于您使用的类名。这是您指定的命令行参数:
storm jar TopologyMain.jar TopologyMain wordcount
您仅使用类名而不是完全限定名将您的主类称为“TopologyMain”。这是我对您的 Storm 命令行尝试的修改:
storm jar TopologyMain.jar com.test.newpackage.TopologyMain
我使用完整的包名而不是单独的类。请注意,我还删除了对“wordcount”的引用,因为我不知道它在那里做什么(在您的代码示例中没有名为“wordcount”的类、方法或变量)。
这是一篇关于 Google 群组的优秀文章,其中涵盖了 Storm 的早期设置问题: early setup question
当我开始使用 Storm 的前几周,我发现自己在使用这篇文章。
如果这能解决您的问题,请告诉我们。
关于java.lang.ClassNotFoundException : TopologyMain 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17724937/
我正在编写一个具有以下签名的 Java 方法。 void Logger(Method method, Object[] args); 如果一个方法(例如 ABC() )调用此方法 Logger,它应该
我是 Java 新手。 我的问题是我的 Java 程序找不到我试图用作的图像文件一个 JButton。 (目前这段代码什么也没做,因为我只是得到了想要的外观第一的)。这是我的主课 代码: packag
好的,今天我在接受采访,我已经编写 Java 代码多年了。采访中说“Java 垃圾收集是一个棘手的问题,我有几个 friend 一直在努力弄清楚。你在这方面做得怎么样?”。她是想骗我吗?还是我的一生都
我的 friend 给了我一个谜语让我解开。它是这样的: There are 100 people. Each one of them, in his turn, does the following
如果我将使用 Java 5 代码的应用程序编译成字节码,生成的 .class 文件是否能够在 Java 1.4 下运行? 如果后者可以工作并且我正在尝试在我的 Java 1.4 应用程序中使用 Jav
有关于why Java doesn't support unsigned types的问题以及一些关于处理无符号类型的问题。我做了一些搜索,似乎 Scala 也不支持无符号数据类型。限制是Java和S
我只是想知道在一个 java 版本中生成的字节码是否可以在其他 java 版本上运行 最佳答案 通常,字节码无需修改即可在 较新 版本的 Java 上运行。它不会在旧版本上运行,除非您使用特殊参数 (
我有一个关于在命令提示符下执行 java 程序的基本问题。 在某些机器上我们需要指定 -cp 。 (类路径)同时执行java程序 (test为java文件名与.class文件存在于同一目录下) jav
我已经阅读 StackOverflow 有一段时间了,现在我才鼓起勇气提出问题。我今年 20 岁,目前在我的家乡(罗马尼亚克卢日-纳波卡)就读 IT 大学。足以介绍:D。 基本上,我有一家提供簿记应用
我有 public JSONObject parseXML(String xml) { JSONObject jsonObject = XML.toJSONObject(xml); r
我已经在 Java 中实现了带有动态类型的简单解释语言。不幸的是我遇到了以下问题。测试时如下代码: def main() { def ks = Map[[1, 2]].keySet()
一直提示输入 1 到 10 的数字 - 结果应将 st、rd、th 和 nd 添加到数字中。编写一个程序,提示用户输入 1 到 10 之间的任意整数,然后以序数形式显示该整数并附加后缀。 public
我有这个 DownloadFile.java 并按预期下载该文件: import java.io.*; import java.net.URL; public class DownloadFile {
我想在 GUI 上添加延迟。我放置了 2 个 for 循环,然后重新绘制了一个标签,但这 2 个 for 循环一个接一个地执行,并且标签被重新绘制到最后一个。 我能做什么? for(int i=0;
我正在对对象 Student 的列表项进行一些测试,但是我更喜欢在 java 类对象中创建硬编码列表,然后从那里提取数据,而不是连接到数据库并在结果集中选择记录。然而,自从我这样做以来已经很长时间了,
我知道对象创建分为三个部分: 声明 实例化 初始化 classA{} classB extends classA{} classA obj = new classB(1,1); 实例化 它必须使用
我有兴趣使用 GPRS 构建车辆跟踪系统。但是,我有一些问题要问以前做过此操作的人: GPRS 是最好的技术吗?人们意识到任何问题吗? 我计划使用 Java/Java EE - 有更好的技术吗? 如果
我可以通过递归方法反转数组,例如:数组={1,2,3,4,5} 数组结果={5,4,3,2,1}但我的结果是相同的数组,我不知道为什么,请帮助我。 public class Recursion { p
有这样的标准方式吗? 包括 Java源代码-测试代码- Ant 或 Maven联合单元持续集成(可能是巡航控制)ClearCase 版本控制工具部署到应用服务器 最后我希望有一个自动构建和集成环境。
我什至不知道这是否可能,我非常怀疑它是否可能,但如果可以,您能告诉我怎么做吗?我只是想知道如何从打印机打印一些文本。 有什么想法吗? 最佳答案 这里有更简单的事情。 import javax.swin
我是一名优秀的程序员,十分优秀!