- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试装配一个 Kafka-Storm“Hello World”系统。我安装并运行了 Kafka,当我使用 Kafka 生产者发送数据时,我可以使用 Kafka 控制台消费者读取它。
我从 O'Reilly 的书“Getting Started With Storm”中获取了第 02 章的示例,并将其修改为使用 KafkaSpout 而不是常规的 spout。
当我运行应用程序时,数据已经在 kafka 中挂起,KafkaSpout 的 nextTuple 没有收到任何消息 - 它进入,尝试遍历协调器下的空管理器列表,然后退出。
我的环境是一个相当老的 Cloudera VM,有 Storm 0.9 和 Kafka-Storm-0.9(最新的),以及 Kafka 2.9.2-0.7.0。
这就是我定义 SpoutConfig 和拓扑的方式:
String zookeepers = "localhost:2181";
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
"gtest",
"/kafka", // zookeeper root path for offset storing
"KafkaSpout");
spoutConfig.forceStartOffsetTime(-1);
KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig);
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", kafkaSpout, 1);
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
谁能帮我弄清楚为什么我没有收到任何东西?
谢谢,G.
最佳答案
如果您已经使用了该消息,则不应再读取它,除非您的生产者生成新消息。这是因为 forceStartOffsetTime
在您的代码中使用 -1
调用。
表格storm-contrib文档:
Another very useful config in the spout is the ability to force the spout to rewind to a previous offset. You do forceStartOffsetTime on the spout config, like so:
spoutConfig.forceStartOffsetTime(-2);
它将选择围绕该时间戳写入的最新偏移量开始消费。您可以通过传入 -1 强制 spout 始终从最新的偏移量开始,也可以通过传入 -2 强制它从最早的偏移量开始。
你的制作人长什么样?有一个片段会很有用。您可以将 -1 替换为 -2,看看您是否收到任何东西,如果您的生产者没问题,那么您应该可以消费。
关于java - KafkaSpout 没有收到来自 Kafka 的任何信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17807292/
我能够使用本地集群运行storm Kafka,但无法使用storm Submitter运行,下面是我的拓扑代码 谁能帮我解决这个问题:) package com.org.kafka; import o
关闭。这个问题需要debugging details .它目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and th
我正在使用storm-kafka-1.1.1-plus和storm 1.1.1。并配置使用BaseRichBolt、一个KafkaSpout和两个 bolt bolt-A、Bolt-B,一旦bolt-
我正在尝试编译和运行 storm-kafka-starter 项目 https://github.com/TheHydroImpulse/storm-kafka-starter KafkaTopolo
我的拓扑结构使用默认的 KafkaSpout 实现。在一些非常受控的测试中,我注意到 spout 失败了元组,即使我的 bolt 都没有失败任何元组,而且我确信所有消息都在我配置的超时内得到了很好的处
我正在尝试装配一个 Kafka-Storm“Hello World”系统。我安装并运行了 Kafka,当我使用 Kafka 生产者发送数据时,我可以使用 Kafka 控制台消费者读取它。 我从 O'R
我需要查看storm通过其KafkaSpout读取的偏移值。这是我传入的配置: SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "some
出于某种原因,当我尝试在 Storm 集群上运行拓扑时出现以下错误: java.lang.NoClassDefFoundError: Could not initialize class org.ap
我正在使用 Kafka-Storm 集成。 Kafka 将数据加载到队列中,Kafka Spout 将拉取数据和进程。我有以下设计。 Kafka -> Queue -> KafkaSpout -> P
我的问题是 Storm KafkaSpout 在一段时间后停止使用来自 Kafka 主题的消息。当在 Storm 中启用调试时,我得到这样的日志文件: 2016-07-05 03:58:26.097
我使用的是 storm 0.9.3。我正在尝试为我的拓扑关闭每个元组的确认。我将 Config.TOPOLOGY_ACKER_EXECUTORS 设置为 0,并将 maxSpoutPending 设置
我开发了 storm 拓扑来从 hortonworks 上的 kafka 代理接收 JSONArray 数据, 我不知道为什么我的 kafkaSpout 不使用 HDP 中 Kafka Brokers
我是一名优秀的程序员,十分优秀!