- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我是 Storm 的初学者。我正在尝试执行下面的示例程序
How to create a topology in storm
SampleSpout.java
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
public class SampleSpout implements IRichSpout{
SpoutOutputCollector collector;
int i=0;
List<Object> tupleList;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public void nextTuple() {
System.out.println("debug help:");
tupleList=new ArrayList<Object>();
tupleList.add("storm"+i);
tupleList.add(i);
System.out.println("tupleList:"+tupleList.size());
for(Object s:tupleList) {
System.out.println("in for loop: "+ s.toString());
}
collector.emit(tupleList,i);
System.out.println("after emit");
i++;
}
@Override
public void ack(Object msgId) {
// TODO Auto-generated method stub
}
@Override
public void fail(Object msgId) {
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
示例 bolt .java
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class SampleBolt implements IBasicBolt {
private static Logger log = LoggerFactory.getLogger(SampleBolt.class);
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
// TODO Auto-generated method stub
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
log.info(input.getValues().toString()+"output values");
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
}
SampleTopology.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class SampleTopology {
/**
* @param args
*/
public static void main(String[] args) {
TopologyBuilder topology=new TopologyBuilder();
topology.setSpout("sampleSpout",new SampleSpout());
topology.setBolt("sampleBolt",new SampleBolt()).shuffleGrouping("sampleSpout");
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster=new LocalCluster();
cluster.submitTopology("test2", conf, topology.createTopology());
}
}
错误信息尾部:
13606 [Thread-4-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
13610 [Thread-4] INFO backtype.storm.daemon.worker - Reading Assignments.
13988 [Thread-4] INFO backtype.storm.daemon.executor - Loading executor sampleBolt:[2 2]
13995 [Thread-4] INFO backtype.storm.daemon.task - Emitting: sampleBolt __system ["startup"]
13995 [Thread-4] INFO backtype.storm.daemon.executor - Loaded executor tasks sampleBolt:[2 2]
14003 [Thread-4] INFO backtype.storm.daemon.executor - Finished loading executor sampleBolt:[2 2]
14011 [Thread-4] INFO backtype.storm.daemon.executor - Loading executor sampleSpout:[3 3]
14012 [Thread-4] INFO backtype.storm.daemon.task - Emitting: sampleSpout __system ["startup"]
14012 [Thread-4] INFO backtype.storm.daemon.executor - Loaded executor tasks sampleSpout:[3 3]
14019 [Thread-4] INFO backtype.storm.daemon.executor - Finished loading executor sampleSpout:[3 3]
14037 [Thread-8-sampleBolt] INFO backtype.storm.daemon.executor - Preparing bolt sampleBolt:(2)
14044 [Thread-4] INFO backtype.storm.daemon.executor - Loading executor __system:[-1 -1]
14044 [Thread-4] INFO backtype.storm.daemon.task - Emitting: __system __system ["startup"]
14045 [Thread-4] INFO backtype.storm.daemon.executor - Loaded executor tasks __system:[-1 -1]
14046 [Thread-10-sampleSpout] INFO backtype.storm.daemon.executor - Opening spout sampleSpout:(3)
14048 [Thread-10-sampleSpout] INFO backtype.storm.daemon.executor - Opened spout sampleSpout:(3)
14049 [Thread-8-sampleBolt] INFO backtype.storm.daemon.executor - Prepared bolt sampleBolt:(2)
14050 [Thread-10-sampleSpout] INFO backtype.storm.daemon.executor - Activating spout sampleSpout:(3)
debug help:
tupleList:2
in for loop: storm0
in for loop: 0
14051 [Thread-4] INFO backtype.storm.daemon.executor - Finished loading executor __system:[-1 -1]
14052 [Thread-12-__system] INFO backtype.storm.daemon.executor - Preparing bolt __system:(-1)
14055 [Thread-10-sampleSpout] ERROR backtype.storm.util - Async loop died!
java.lang.NullPointerException: null
at sample.SampleSpout.nextTuple(SampleSpout.java:50) ~[bin/:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_05]
14055 [Thread-12-__system] INFO backtype.storm.daemon.executor - Prepared bolt __system:(-1)
14055 [Thread-10-sampleSpout] ERROR backtype.storm.daemon.executor -
java.lang.NullPointerException: null
at sample.SampleSpout.nextTuple(SampleSpout.java:50) ~[bin/:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_05]
14058 [Thread-4] INFO backtype.storm.daemon.executor - Loading executor __acker:[1 1]
14059 [Thread-4] INFO backtype.storm.daemon.task - Emitting: __acker __system ["startup"]
14059 [Thread-4] INFO backtype.storm.daemon.executor - Loaded executor tasks __acker:[1 1]
14060 [Thread-14-__acker] INFO backtype.storm.daemon.executor - Preparing bolt __acker:(1)
14062 [Thread-14-__acker] INFO backtype.storm.daemon.executor - Prepared bolt __acker:(1)
14062 [Thread-4] INFO backtype.storm.daemon.executor - Timeouts disabled for executor __acker:[1 1]
14062 [Thread-4] INFO backtype.storm.daemon.executor - Finished loading executor __acker:[1 1]
14063 [Thread-4] INFO backtype.storm.daemon.worker - Launching receive-thread for c6805798-cdfd-421a-b357-8c704a0c53f1:1024
14070 [Thread-15-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Starting receive-thread: [stormId: test2-1-1419530728, port: 1024, thread-id: 0 ]
14077 [Thread-4] INFO backtype.storm.daemon.worker - Worker has topology config {"storm.id" "test2-1-1419530728", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/tmp/5e0749e5-7d64-49cc-948f-8798e2262e70", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "storm.meta.serialization.delegate" "backtype.storm.serialization.DefaultSerializationDelegate", "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "topology.name" "test2", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 300, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.environment" nil, "topology.debug" true, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "topology.classpath" nil}
14077 [Thread-4] INFO backtype.storm.daemon.worker - Worker 2b0ea9bd-a0b4-4ec8-b49f-b6dadce50b36 for storm test2-1-1419530728 on c6805798-cdfd-421a-b357-8c704a0c53f1:1024 has finished loading
14106 [Thread-10-sampleSpout] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_05]
最佳答案
你得到一个空指针异常,因为收集器变量是空的。
你需要在open方法中保存传递给你的收集器:
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
然后,当您想要发出元组时,您将拥有对在 open() 方法中传递给您的收集器的引用。
关于hadoop - Storm spout NullPointerException - 异步循环终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27653856/
我们正在以伪模式执行 Storm 拓扑。 Storm 拓扑运行良好,能够连接 Storm UI (8080)。 但是Storm UI 没有显示正在运行的拓扑信息。 也重新启动了 Storm UI 进程
我们有一个相当简单的 Storm 拓扑,让人头疼。 我们的一个 bolt 可以发现它正在处理的数据是有效的,并且每件事都正常进行,或者它可以发现它是无效但可以修复的。在这种情况下,我们需要将其发送以进
我是 Storm 中 Trident 的新手。我对 TridentState 感到很头疼。据我了解,三叉戟维护每个批次的状态(即元数据)(批次中的所有元组是否都通过在数据库中维护事务 ID 来完全处理
我有以下情况: 有许多 bolt 计算不同的值 该值被发送到可视化 bolt 可视化 bolt 打开一个网络套接字并发送值以某种方式可视化 问题是,可视化 bolt 总是相同的,但它为可以作为其输入的
我正在使用 Kafka storm,kafka 向 storm 发送/发出 json 字符串,在 storm 中,我想根据 json 中的键/字段将负载分配给几个工作人员。怎么做?在我的例子中,它是
我需要使用 Storm 处理成批的元组。我的最后一个 bolt 必须等到拓扑接收到整个批次,然后才能进行一些处理。为避免混淆 - 对我来说,批处理是一组实时出现的 N 条消息,该术语不必与批处理 (H
我是 Storm 的新手..我遇到了以下错误 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannel
这是一个让我发疯的问题。我的本地 LAN 上运行着一台机器 Storm 实例。我目前正在运行 v0.9.1-incubating发布版本(来自 the Apache Incubator site。问题
我是第一次使用 Storm(从开始使用 Storm 学习),我的项目在运行时失败并出现 ClassNotFoundException: [WARNING] java.lang.ClassNotFoun
如何为 Storm 拓扑提供自定义配置?例如,如果我构建了一个连接到 MySQL 集群的拓扑,并且我希望能够在不重新编译的情况下更改需要连接的服务器,我该怎么做?我的偏好是使用配置文件,但我担心文件本
我一直在阅读 Storm并尝试使用 Storm-starter 中的示例。 我想我明白了这个概念,它非常适用于许多情况。我有一个我想做的测试项目来了解更多关于这方面的信息,但我想知道 Storm 是否
在我们的 Storm 1.0.2 应用程序中,我们面临内存不足的异常。在调试时,我们发现 Kafka spout 向 Bolt 发出了太多消息。 bolt 的运行能力几乎为 4.0。那么有没有一种方法
看完this和 this我很难理解如何配置我的三叉戟拓扑。 基本上我的 Storm 应用程序正在读取 kafka ,进行一些数据操作,最后写入 Cassandra . 这是我目前构建拓扑的方式: pr
我已经从 https://github.com/apache/incubator-storm 下载了 incubator-storm 代码.现在,我尝试使用以下命令运行 WordCountTopolo
我一直在努力理解 Storm 架构,但我不确定我是否理解正确。我会尽量准确地解释我认为的情况。请解释什么 - 如果 - 我错了,什么是对的。 初步想法: worker http://storm.apa
这是我阅读后想到的一个问题: What is the "task" in Storm parallelism 如果我需要在 bolt 的内部状态中保留一些信息,例如,在经典的单词计数用例中,将 bol
我已经使用 docker compose 安装了 Apache-Storm docker-compose.yml: kafka: image: spotify/kafka ports:
我正在围绕我的 Storm 拓扑构建一个监控服务,并希望能够获取各个时间窗口周围的失败元组数量,类似于 Storm UI 如何在 10m、3h 和 1d 窗口中显示失败元组的数量。 我的监控服务目前是
我已经在我的机器上配置了 Storm。 Zookeeper、Nimbus 和 Supervisor 运行正常。 现在我想向这个 Storm 提交一个拓扑。 我正在尝试使用 Storm jar 。 但我
我在玩 Storm,我想知道 Storm 在哪里指定(如果可能)聚合时的(翻滚/滑动)窗口大小。例如。如果我们想在 Twitter 上找到前一小时的热门话题。我们如何指定一个 bolt 应该每小时返回
我是一名优秀的程序员,十分优秀!