- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Kafka storm,kafka 向 storm 发送/发出 json 字符串,在 storm 中,我想根据 json 中的键/字段将负载分配给几个工作人员。怎么做?在我的例子中,它是 json 字符串中的 groupid 字段。
比如我有这样的json:
{groupid: 1234, userid: 145, comments:"I want to distribute all this group 1234 to one worker", size:50,type:"group json"}
{groupid: 1235, userid: 134, comments:"I want to distribute all this group 1234 to another worker", size:90,type:"group json"}
{groupid: 1234, userid: 158, comments:"I want to be sent to same worker as group 1234", size:50,type:"group json"}
=== Storm 0.9.4。被使用=====
我的源代码如下:
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class KafkaBoltMain {
private static final String SPOUTNAME="TopicSpout";
private static final String ANALYSISBOLT = "AnalysisWorker";
private static final String CLIENTID = "Storm";
private static final String TOPOLOGYNAME = "LocalTopology";
private static class AppAnalysisBolt extends BaseRichBolt {
private static final long serialVersionUID = -6885792881303198646L;
private OutputCollector _collector;
private long groupid=-1L;
private String log="test";
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
List<Object> objs = tuple.getValues();
int i=0;
for(Object obj:objs){
System.out.println(""+i+"th object's value is:"+obj.toString());
i++;
}
// _collector.emit(new Values(groupid,log));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("groupid","log"));
}
}
public static void main(String[] args){
String zookeepers = null;
String topicName = null;
if(args.length == 2 ){
zookeepers = args[0];
topicName = args[1];
}else if(args.length == 1 && args[0].equalsIgnoreCase("help")){
System.out.println("xxxx");
System.exit(0);
}
else{
System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
System.out.println("xxxx");
System.exit(-1);
}
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
topicName,
"",// zookeeper root path for offset storing
CLIENTID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUTNAME, kafkaSpout, 1);
builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt(),2)
.fieldsGrouping(SPOUTNAME,new Fields("groupid"));
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGYNAME, conf, builder.createTopology());
}
}
但是当我提交拓扑时,它给出了以下错误:
12794 [main] WARN backtype.storm.daemon.nimbus - Topology submission exception. (topology name='LocalTopology') #<InvalidTopologyException InvalidTopologyException(msg:Component:
[AnalysisWorker] subscribes from stream: [default] of component [TopicSpout] with non-existent fields: #{"groupid"})>
12800 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null
为什么会出现文件不存在的警告信息?有什么提示吗?
最佳答案
您需要从 json 对象中提取 json 属性,并将两个值(json 对象和字符串 groupId)作为一个双值元组传递。当您将流声明为拓扑规范逻辑的一部分时,您可以将第二个字段命名为“groupId”,并且一切正常。如果您不想修改 Kafka spout,则需要一个中介 bolt ,其唯一目的是将 groupId 从 json 对象中分离出来。中间 bolt 还可以使用定向流(emitDirect() 方法),将目标基于 json 对象中的 groupId。
这就是我不重用 Kafka spout 的原因之一——除了盲目地将数据写入流之外,我通常还想做其他事情,但这既不在这里也不在那里。
关于apache-storm - Storm 场分组示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29998310/
我们正在以伪模式执行 Storm 拓扑。 Storm 拓扑运行良好,能够连接 Storm UI (8080)。 但是Storm UI 没有显示正在运行的拓扑信息。 也重新启动了 Storm UI 进程
我们有一个相当简单的 Storm 拓扑,让人头疼。 我们的一个 bolt 可以发现它正在处理的数据是有效的,并且每件事都正常进行,或者它可以发现它是无效但可以修复的。在这种情况下,我们需要将其发送以进
我是 Storm 中 Trident 的新手。我对 TridentState 感到很头疼。据我了解,三叉戟维护每个批次的状态(即元数据)(批次中的所有元组是否都通过在数据库中维护事务 ID 来完全处理
我有以下情况: 有许多 bolt 计算不同的值 该值被发送到可视化 bolt 可视化 bolt 打开一个网络套接字并发送值以某种方式可视化 问题是,可视化 bolt 总是相同的,但它为可以作为其输入的
我正在使用 Kafka storm,kafka 向 storm 发送/发出 json 字符串,在 storm 中,我想根据 json 中的键/字段将负载分配给几个工作人员。怎么做?在我的例子中,它是
我需要使用 Storm 处理成批的元组。我的最后一个 bolt 必须等到拓扑接收到整个批次,然后才能进行一些处理。为避免混淆 - 对我来说,批处理是一组实时出现的 N 条消息,该术语不必与批处理 (H
我是 Storm 的新手..我遇到了以下错误 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannel
这是一个让我发疯的问题。我的本地 LAN 上运行着一台机器 Storm 实例。我目前正在运行 v0.9.1-incubating发布版本(来自 the Apache Incubator site。问题
我是第一次使用 Storm(从开始使用 Storm 学习),我的项目在运行时失败并出现 ClassNotFoundException: [WARNING] java.lang.ClassNotFoun
如何为 Storm 拓扑提供自定义配置?例如,如果我构建了一个连接到 MySQL 集群的拓扑,并且我希望能够在不重新编译的情况下更改需要连接的服务器,我该怎么做?我的偏好是使用配置文件,但我担心文件本
我一直在阅读 Storm并尝试使用 Storm-starter 中的示例。 我想我明白了这个概念,它非常适用于许多情况。我有一个我想做的测试项目来了解更多关于这方面的信息,但我想知道 Storm 是否
在我们的 Storm 1.0.2 应用程序中,我们面临内存不足的异常。在调试时,我们发现 Kafka spout 向 Bolt 发出了太多消息。 bolt 的运行能力几乎为 4.0。那么有没有一种方法
看完this和 this我很难理解如何配置我的三叉戟拓扑。 基本上我的 Storm 应用程序正在读取 kafka ,进行一些数据操作,最后写入 Cassandra . 这是我目前构建拓扑的方式: pr
我已经从 https://github.com/apache/incubator-storm 下载了 incubator-storm 代码.现在,我尝试使用以下命令运行 WordCountTopolo
我一直在努力理解 Storm 架构,但我不确定我是否理解正确。我会尽量准确地解释我认为的情况。请解释什么 - 如果 - 我错了,什么是对的。 初步想法: worker http://storm.apa
这是我阅读后想到的一个问题: What is the "task" in Storm parallelism 如果我需要在 bolt 的内部状态中保留一些信息,例如,在经典的单词计数用例中,将 bol
我已经使用 docker compose 安装了 Apache-Storm docker-compose.yml: kafka: image: spotify/kafka ports:
我正在围绕我的 Storm 拓扑构建一个监控服务,并希望能够获取各个时间窗口周围的失败元组数量,类似于 Storm UI 如何在 10m、3h 和 1d 窗口中显示失败元组的数量。 我的监控服务目前是
我已经在我的机器上配置了 Storm。 Zookeeper、Nimbus 和 Supervisor 运行正常。 现在我想向这个 Storm 提交一个拓扑。 我正在尝试使用 Storm jar 。 但我
我在玩 Storm,我想知道 Storm 在哪里指定(如果可能)聚合时的(翻滚/滑动)窗口大小。例如。如果我们想在 Twitter 上找到前一小时的热门话题。我们如何指定一个 bolt 应该每小时返回
我是一名优秀的程序员,十分优秀!