- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我的数据来自通过 websocket 连接到我的 Storm 集群的传感器,因此每当数据点到达我的 websocket 服务器时,我都会将其添加到 ConcurrentLinkedQueue。我没有关于数据点“生产”频率的先验信息。
我的 spout 获取此队列上的数据点并发出相应的 Tupple。 Everythink 在一段时间内运行良好(我会说大约 1000 个数据点)但随后出现以下错误:
76245 [Thread-23-incDp] ERROR backtype.storm.util - Async loop died!
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
76246 [Thread-23-incDp] ERROR backtype.storm.daemon.executor -
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
76481 [Thread-23-incDp] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
这是我的 spout 代码:
public class MySpout extends BaseRichSpout
{
private static final long serialVersionUID = 1L;
private AtomicLong messageIdCounter = new AtomicLong();
private static Queue<String> incData;
private SpoutOutputCollector collector;
public MySpout()
{
incData = new ConcurrentLinkedQueue<String>();
}
@Override
public void nextTuple()
{
if(incData.isEmpty())
{
Utils.sleep(500);
}
else
{
String[] splittedMsg = incData.poll().split(" ; ");
JSONParser jsonParser = new JSONParser();
try
{
int ts = Integer.parseInt(splittedMsg[0]);
JSONObject json = (JSONObject) jsonParser.parse(splittedMsg[1]);
collector.emit(new Values( ts, new DataPoint(ts, new ArrayList(json.values()))), messageIdCounter.incrementAndGet() );
}
catch (ParseException e)
{
System.err.println("Wrong input format: should be json");
e.printStackTrace();
}
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("time", "dps" ));
}
public static void addElmtToQueue(String json)
{
incData.add(json);
}
我想这是因为队列出了点问题,要么不再是数据点,要么是我的两个线程之间的并发问题(顺便说一句,我知道让它静态化可能不好,但我还没有找到其他解决方案,因为我需要在我的服务器线程上访问它...)。
有人遇到过同样的问题吗?欢迎任何解决方案/评论:)
提前致谢。
最佳答案
MySpout.java 的第 56 行抛出 NullPointerException。
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
我敢打赌这行是问题所在:
String[] splittedMsg = incData.poll().split(" ; ");
由于事实that poll() will return null when your queue is empty .
NullPointerException 导致 spout 崩溃。我会建议两件事:
例子:
@Override
public void nextTuple() {
try {
String message = _queue.poll();
if (message == null) {
// didn't get a message, sleep for a little bit
Utils.sleep(50);
} else {
// do stuff with message
}
} catch (Exception e) {
_collector.reportError(e);
LOG.error("Spout error {}", e);
}
}
关于java - Storm Spout 错误 backtype.storm.util - 异步循环终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28899824/
我在这个网站上发布的代码有这个问题 https://developers.google.com/drive/quickstart-cs是 Google Drive 快速入门的开发人员站点。我按照网站上
我正在尝试制作一个非常简单的 Kafka Producer,目前正在关注 producer example除了我的制作人没有分区程序类。 将所需文件导出到 jar 后,我将它们传输到我的 Linux
问题 在java中,我有一个“Util项目”,在进行单元测试时使用另一个“Mock项目”。 我的问题是“模拟项目”也使用“Util项目”来构建一些模拟对象。 当我使用 Maven 构建项目时,我无法构
据我所知,这些包已经存在很长时间了。但是,我从未见过它们的实际用法。而且这些包似乎不成熟,不再维护。如果是,为什么这些包现在存在? 最佳答案 包裹automata被 scala.xml.dtd 使用,
关闭。这个问题需要debugging details .它目前不接受答案。 想改进这个问题?将问题更新为 on-topic对于堆栈溢出。 1年前关闭。 Improve this question Co
在java.util.Collections中,有一个方法: public static void fill(List list, T obj) 用第二个参数指定的对象填充第一个参数指定的List。
我不明白它要我做什么。分配给 sentence正在工作: val sentences : java.util.List[CoreMap] = document.get(classOf[Sentence
在我的 React 应用程序中,我想使用一些实用程序。我见过两种不同的方法。第一个是,只是创建函数并将其导出。第二个是,创建一个 Util 类并导出一个对象,这样它就不能被实例化(静态类)。 clas
我有一个 util 类,它接受 String jwtToken 和 Key key 并使用 io.jsonwebtoken.jwts 解码 jwt。 但是,我无法对此进行测试。原因是,我无法模拟公钥并
我有使用目标命名空间的专有架构 xmlns:ax216="http://util.java/xsd" 这给我带来了从 java (java.util.xsd) 开始生成禁止的(由 Java 安全管理器
我正在阅读集合以查看 Javadocs 中的实现层次结构。 Collections声明为public class Collections extendds Object Collection声明为pu
我正在使用 Spring-boot 应用程序,我可以在其中连接 Azure 应用程序配置。但是当我尝试使用内容类型应用程序/JSON 读取值时出现错误。 我的Java类 @ConfigurationP
我正在使用 Spring-boot 应用程序,我可以在其中连接 Azure 应用程序配置。但是当我尝试使用内容类型应用程序/JSON 读取值时出现错误。 我的Java类 @ConfigurationP
我在使用格式说明符时遇到问题。这是否意味着我正在使用 %d? public static void main(String[] args) { double y, x; for (x =
鉴于此代码 import java.util.Iterator; private static List someList = new ArrayList(); public static void
我正在 HackerEarth 解决问题,我无法弄清楚为什么我的程序在命令行上正确运行并给出正确的结果,但在代码编辑器上运行时却给出 java.util.NoSuchElementException
我正在尝试使用以下代码使用对象列表列表中的数据填充tableModel readExcel.readSheet(0): TableModel tableModel = new DefaultTabl
java.util.Set 、 java.util.List 和其他 Collection 接口(interface)不可序列化。需要一个简单、直接的解决方案来在可序列化的 POJO 中使用它。 pu
我试图从 servlet 返回数据库搜索结果的 ArrayList 以显示在 jsp 页面上。 在servlet中设置arraylist作为请求的属性,并将请求转发到jsp页面。当我尝试在 jsp 页
我是android新手,最近我试图从firebase中提取数据到recyclerview/cardview中以垂直布局显示数据,它显示将Hashmap转换为Arraylist的错误,其中代码是:
我是一名优秀的程序员,十分优秀!