- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
一般:我是一名学生,想要在 Storm/Kafka/Flink/MS Azure SA/Spark 上运行一些性能测试(WordCount)。我想使用 Kafka Broker 作为输入源。
我使用了 Storm-Starter 项目中的 WordCount 示例,并将 Kafka 添加为 spout:
public class WordCountKafkaTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) {
String zkIp = "localhost";
String topicName = "perfTest";
List<String> nimbus_seeds = new ArrayList<String>();
nimbus_seeds.add("localhost");
String zookeeperHost = zkIp +":2181";
ZkHosts zkHosts = new ZkHosts(zookeeperHost);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, "/" + topicName, topicName);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaPerfTestSpout", kafkaSpout, 8);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("kafkaPerfTestSpout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config config = new Config();
config.setMaxTaskParallelism(5);
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);
config.put(Config.NIMBUS_SEEDS, nimbus_seeds);
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkIp));
try {
StormSubmitter.submitTopology("my-kafka-topology", config, builder.createTopology());
} catch (Exception e) {
throw new IllegalStateException("Couldn't initialize the topology", e);
}
}
}
通过运行拓扑,我收到一些错误消息。喷嘴说:
java.lang.ExceptionInInitializerError at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:89) at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26) at kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:35) at kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:47) at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:60) at kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:60) at kafka.utils.Pool.getAndMaybePut(Pool.scala:59) at kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:64) at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:44) at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34) at org.apache.storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:60) at org.apache.storm.kafka.PartitionManager.(PartitionManager.java:74) at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalStateException: Shutdown in progress at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66) at java.lang.Runtime.addShutdownHook(Runtime.java:211) at com.yammer.metrics.Metrics.(Metrics.java:21) ... 19 more
在对开 bolt 处:
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: pid:3973, name:split exitCode:0, errorString: at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) at org.apache.storm.daemon.executor$fn__8058$fn__8071$fn__8124.invoke(executor.clj:850) at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: java.lang.RuntimeException: pid:3973, name:split exitCode:0, errorString: at org.apache.storm.task.ShellBolt.execute(ShellBolt.java:150) at org.apache.storm.daemon.executor$fn__8058$tuple_action_fn__8060.invoke(executor.clj:731) at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ... 6 more Caused by: java.lang.RuntimeException: pid:3973, name:split exitCode:0, errorString: at org.apache.storm.task.ShellBolt.die(ShellBolt.java:295) at org.apache.storm.task.ShellBolt.access$400(ShellBolt.java:70) at org.apache.storm.task.ShellBolt$BoltWriterRunnable.run(ShellBolt.java:398) ... 1 more Caused by: java.io.IOException: Broken pipe at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:326) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:297) at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) at java.io.BufferedWriter.flush(BufferedWriter.java:254) at org.apache.storm.multilang.JsonSerializer.writeString(JsonSerializer.java:99) at org.apache.storm.multilang.JsonSerializer.writeMessage(JsonSerializer.java:93) at org.apache.storm.multilang.JsonSerializer.writeBoltMsg(JsonSerializer.java:78) at org.apache.storm.utils.ShellProcess.writeBoltMsg(ShellProcess.java:127) at org.apache.storm.task.ShellBolt$BoltWriterRunnable.run(ShellBolt.java:387) ... 1 more
我使用 kafka-console- Producer 生成一些消息。我希望有一个人可以帮助我。我是编程 Storm 中的菜鸟...
最佳答案
删除“config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);”完成了工作!
关于java - Storm Kafka-Spout 无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41938022/
我们正在以伪模式执行 Storm 拓扑。 Storm 拓扑运行良好,能够连接 Storm UI (8080)。 但是Storm UI 没有显示正在运行的拓扑信息。 也重新启动了 Storm UI 进程
我们有一个相当简单的 Storm 拓扑,让人头疼。 我们的一个 bolt 可以发现它正在处理的数据是有效的,并且每件事都正常进行,或者它可以发现它是无效但可以修复的。在这种情况下,我们需要将其发送以进
我是 Storm 中 Trident 的新手。我对 TridentState 感到很头疼。据我了解,三叉戟维护每个批次的状态(即元数据)(批次中的所有元组是否都通过在数据库中维护事务 ID 来完全处理
我有以下情况: 有许多 bolt 计算不同的值 该值被发送到可视化 bolt 可视化 bolt 打开一个网络套接字并发送值以某种方式可视化 问题是,可视化 bolt 总是相同的,但它为可以作为其输入的
我正在使用 Kafka storm,kafka 向 storm 发送/发出 json 字符串,在 storm 中,我想根据 json 中的键/字段将负载分配给几个工作人员。怎么做?在我的例子中,它是
我需要使用 Storm 处理成批的元组。我的最后一个 bolt 必须等到拓扑接收到整个批次,然后才能进行一些处理。为避免混淆 - 对我来说,批处理是一组实时出现的 N 条消息,该术语不必与批处理 (H
我是 Storm 的新手..我遇到了以下错误 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannel
这是一个让我发疯的问题。我的本地 LAN 上运行着一台机器 Storm 实例。我目前正在运行 v0.9.1-incubating发布版本(来自 the Apache Incubator site。问题
我是第一次使用 Storm(从开始使用 Storm 学习),我的项目在运行时失败并出现 ClassNotFoundException: [WARNING] java.lang.ClassNotFoun
如何为 Storm 拓扑提供自定义配置?例如,如果我构建了一个连接到 MySQL 集群的拓扑,并且我希望能够在不重新编译的情况下更改需要连接的服务器,我该怎么做?我的偏好是使用配置文件,但我担心文件本
我一直在阅读 Storm并尝试使用 Storm-starter 中的示例。 我想我明白了这个概念,它非常适用于许多情况。我有一个我想做的测试项目来了解更多关于这方面的信息,但我想知道 Storm 是否
在我们的 Storm 1.0.2 应用程序中,我们面临内存不足的异常。在调试时,我们发现 Kafka spout 向 Bolt 发出了太多消息。 bolt 的运行能力几乎为 4.0。那么有没有一种方法
看完this和 this我很难理解如何配置我的三叉戟拓扑。 基本上我的 Storm 应用程序正在读取 kafka ,进行一些数据操作,最后写入 Cassandra . 这是我目前构建拓扑的方式: pr
我已经从 https://github.com/apache/incubator-storm 下载了 incubator-storm 代码.现在,我尝试使用以下命令运行 WordCountTopolo
我一直在努力理解 Storm 架构,但我不确定我是否理解正确。我会尽量准确地解释我认为的情况。请解释什么 - 如果 - 我错了,什么是对的。 初步想法: worker http://storm.apa
这是我阅读后想到的一个问题: What is the "task" in Storm parallelism 如果我需要在 bolt 的内部状态中保留一些信息,例如,在经典的单词计数用例中,将 bol
我已经使用 docker compose 安装了 Apache-Storm docker-compose.yml: kafka: image: spotify/kafka ports:
我正在围绕我的 Storm 拓扑构建一个监控服务,并希望能够获取各个时间窗口周围的失败元组数量,类似于 Storm UI 如何在 10m、3h 和 1d 窗口中显示失败元组的数量。 我的监控服务目前是
我已经在我的机器上配置了 Storm。 Zookeeper、Nimbus 和 Supervisor 运行正常。 现在我想向这个 Storm 提交一个拓扑。 我正在尝试使用 Storm jar 。 但我
我在玩 Storm,我想知道 Storm 在哪里指定(如果可能)聚合时的(翻滚/滑动)窗口大小。例如。如果我们想在 Twitter 上找到前一小时的热门话题。我们如何指定一个 bolt 应该每小时返回
我是一名优秀的程序员,十分优秀!