- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我终于想到我有一个写在redis数据库上的拓扑。我有一个要打印的 bolt ,还有一个要插入 redis 的 bolt 。但是当我尝试启动拓扑时,它出现了这个错误:
...5333 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
5376 [main] INFO b.s.d.supervisor - Starting supervisor with id 1917ef54-0f16-47b8-86ea-b6722aa33c68 at host amnor-A88XPLUS
5405 [main] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[main,5,main] died
java.lang.RuntimeException: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology
at backtype.storm.utils.Utils.javaSerialize(Utils.java:91) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:107) ~[storm-core-0.10.0.jar:0.10.0]
at Storm.practice.Storm.Prova.ProvaTopology.main(ProvaTopology.java:383) ~[classes/:?]
Caused by: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_91]
at backtype.storm.utils.Utils.javaSerialize(Utils.java:87) ~[storm-core-0.10.0.jar:0.10.0]
... 2 more
我认为它可能是 Spout,但我已经尝试使用 Storm Examples 上可用的示例 Spout 并且发生了相同的情况。我的代码只是在读取的名称中添加笑脸,例如 (John :) :)),我只是想真正将流存储到 redis 数据库,它只是一个从文件中读取名称的小测试拓扑。之后,我正在为我大学的一个大数据项目做一个严肃的拓扑。这是我的代码(有很多未使用的导入,但那是因为我尝试了不同的方法来写入数据库):
package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
//import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
/**
* This is a basic example of a Storm topology.
*/
public class ProvaTopology {
public static class ProvaBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + " :-)"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("Morts"));
}
}
public class ProvaSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
//Random _rand;
private String fileName;
//private SpoutOutputCollector _collector;
private BufferedReader reader;
private AtomicLong linesRead;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
try {
fileName= (String)"/home/prova.txt";
reader = new BufferedReader(new FileReader(fileName));
// read and ignore the header if one exists
} catch (Exception e) {
throw new RuntimeException(e);
}
// _rand = new Random();
}
public void nextTuple() {
Utils.sleep(100);
try {
String line = reader.readLine();
if (line != null) {
long id = linesRead.incrementAndGet();
System.out.println("Finished reading line, " + line);
_collector.emit(new Values((String)line));
} else {
System.out.println("Finished reading file, " + linesRead.get() + " lines read");
Thread.sleep(10000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void ack(Object id) {
}
public void fail(Object id) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("Morts"));
}
}
public class RedisBolt implements IRichBolt {
protected String channel = "Somriures";
// protected String configChannel;
protected OutputCollector collector;
// protected Tuple currentTuple;
// protected Logger log;
protected JedisPool pool;
// protected ConfigListenerThread configListenerThread;
public RedisBolt(){}
public RedisBolt(String channel) {
// log = Logger.getLogger(getClass().getName());
// setupNonSerializableAttributes();
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
pool = new JedisPool("localhost");
}
public void execute(Tuple tuple) {
String current = tuple.getString(0);
if(current != null) {
// for(Object obj: result) {
publish(current);
collector.emit(tuple, new Values(current));
// }
collector.ack(tuple);
}
}
public void cleanup() {
if(pool != null) {
pool.destroy();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(channel));
}
public void publish(String msg) {
Jedis jedis = pool.getResource();
jedis.publish(channel, msg);
pool.returnResource(jedis);
}
protected void setupNonSerializableAttributes() {
}
public Map getComponentConfiguration() {
return null;
}
}
public class PrinterBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
ProvaTopology Pt = new ProvaTopology();
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("127.0.0.1").setPort(666).build();
builder.setSpout("Morts", Pt.new ProvaSpout(), 10);//emisorTestWordSpout
builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("Morts");// de on llig?
builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?
builder.setBolt("bd", Pt.new RedisBolt(), 2).shuffleGrouping("meal");// de on llig?
builder.setBolt("print", Pt.new PrinterBolt(), 2).shuffleGrouping("meal");
// builder.setBolt("StoreM", (storeMapperS));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(5);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
//WithProgressBar
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
提前致谢
最佳答案
这里的异常(exception)非常明显。如果您只是查看有关 java.io.NotSerializableException
的文档,您会发现正在打印的消息是不可序列化的类。要修复,只需让您的 Topology 类实现 Serializable
:
public class ProvaTopology implements Serializable {
...
}
这是必需的,以便 Storm 可以序列化您的 Topology 并将其发送到 Nimbus 执行。由于您的 Bolt 和 Spout 扩展或实现了 Storm 提供的类或接口(interface),您不必担心将它们标记为可序列化,因为这些父类和接口(interface)已经这样做了。
关于java - Storm java.io.NotSerializableException : when running topology,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35070386/
我们正在以伪模式执行 Storm 拓扑。 Storm 拓扑运行良好,能够连接 Storm UI (8080)。 但是Storm UI 没有显示正在运行的拓扑信息。 也重新启动了 Storm UI 进程
我们有一个相当简单的 Storm 拓扑,让人头疼。 我们的一个 bolt 可以发现它正在处理的数据是有效的,并且每件事都正常进行,或者它可以发现它是无效但可以修复的。在这种情况下,我们需要将其发送以进
我是 Storm 中 Trident 的新手。我对 TridentState 感到很头疼。据我了解,三叉戟维护每个批次的状态(即元数据)(批次中的所有元组是否都通过在数据库中维护事务 ID 来完全处理
我有以下情况: 有许多 bolt 计算不同的值 该值被发送到可视化 bolt 可视化 bolt 打开一个网络套接字并发送值以某种方式可视化 问题是,可视化 bolt 总是相同的,但它为可以作为其输入的
我正在使用 Kafka storm,kafka 向 storm 发送/发出 json 字符串,在 storm 中,我想根据 json 中的键/字段将负载分配给几个工作人员。怎么做?在我的例子中,它是
我需要使用 Storm 处理成批的元组。我的最后一个 bolt 必须等到拓扑接收到整个批次,然后才能进行一些处理。为避免混淆 - 对我来说,批处理是一组实时出现的 N 条消息,该术语不必与批处理 (H
我是 Storm 的新手..我遇到了以下错误 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannel
这是一个让我发疯的问题。我的本地 LAN 上运行着一台机器 Storm 实例。我目前正在运行 v0.9.1-incubating发布版本(来自 the Apache Incubator site。问题
我是第一次使用 Storm(从开始使用 Storm 学习),我的项目在运行时失败并出现 ClassNotFoundException: [WARNING] java.lang.ClassNotFoun
如何为 Storm 拓扑提供自定义配置?例如,如果我构建了一个连接到 MySQL 集群的拓扑,并且我希望能够在不重新编译的情况下更改需要连接的服务器,我该怎么做?我的偏好是使用配置文件,但我担心文件本
我一直在阅读 Storm并尝试使用 Storm-starter 中的示例。 我想我明白了这个概念,它非常适用于许多情况。我有一个我想做的测试项目来了解更多关于这方面的信息,但我想知道 Storm 是否
在我们的 Storm 1.0.2 应用程序中,我们面临内存不足的异常。在调试时,我们发现 Kafka spout 向 Bolt 发出了太多消息。 bolt 的运行能力几乎为 4.0。那么有没有一种方法
看完this和 this我很难理解如何配置我的三叉戟拓扑。 基本上我的 Storm 应用程序正在读取 kafka ,进行一些数据操作,最后写入 Cassandra . 这是我目前构建拓扑的方式: pr
我已经从 https://github.com/apache/incubator-storm 下载了 incubator-storm 代码.现在,我尝试使用以下命令运行 WordCountTopolo
我一直在努力理解 Storm 架构,但我不确定我是否理解正确。我会尽量准确地解释我认为的情况。请解释什么 - 如果 - 我错了,什么是对的。 初步想法: worker http://storm.apa
这是我阅读后想到的一个问题: What is the "task" in Storm parallelism 如果我需要在 bolt 的内部状态中保留一些信息,例如,在经典的单词计数用例中,将 bol
我已经使用 docker compose 安装了 Apache-Storm docker-compose.yml: kafka: image: spotify/kafka ports:
我正在围绕我的 Storm 拓扑构建一个监控服务,并希望能够获取各个时间窗口周围的失败元组数量,类似于 Storm UI 如何在 10m、3h 和 1d 窗口中显示失败元组的数量。 我的监控服务目前是
我已经在我的机器上配置了 Storm。 Zookeeper、Nimbus 和 Supervisor 运行正常。 现在我想向这个 Storm 提交一个拓扑。 我正在尝试使用 Storm jar 。 但我
我在玩 Storm,我想知道 Storm 在哪里指定(如果可能)聚合时的(翻滚/滑动)窗口大小。例如。如果我们想在 Twitter 上找到前一小时的热门话题。我们如何指定一个 bolt 应该每小时返回
我是一名优秀的程序员,十分优秀!