gpt4 book ai didi

java - 关闭和杀死拓扑方法的区别以及如何杀死拓扑

转载 作者:太空宇宙 更新时间:2023-11-04 07:06:28 25 4
gpt4 key购买 nike

我们在本地模式下运行字数统计拓扑。当我们使用 shutdown 方法时,我们会收到“java.net.connectException”之类的错误,整个进程将关闭。当我们使用 cluster.killtopology 方法时,进程无法完全终止。同时使用killtopology 方法和 shutdown 方法时,有时会执行拓扑,有时会显示错误“java.net.connectException”,整个进程将关闭。 您能告诉一下会出现什么问题吗?

这是我们的代码:

主要程序:

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile","E:\\words.txt");

conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordcount", conf, builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}

Spout 程序:

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.*;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class WordReader implements IRichSpout {
private SpoutOutputCollector collector;
Map<String, Object> count;

private FileReader fileReader;
private boolean completed = false;
private TopologyContext context;

public boolean isDistributed() {return false;}
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}

/**
* The only thing that the methods will do It is emit each
* file line
*/
public void nextTuple() {
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return;
}
String str;
//Open the reader
BufferedReader reader = new BufferedReader(fileReader);
try{
//Read all lines
while((str = reader.readLine()) != null){
/**
* By each line emmit a new value with the line as a their
*/
this.collector.emit(new Values(str),str);
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}

/**
* We will create the file and get the collector object
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]");
}
this.collector = collector;
}

/**
* Declare the output field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("str"));
}

public void deactivate(){}
public void activate(){}
public Map<String, Object> getComponentConfiguration(){return count;}
}

bolt 标准化程序:::

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class WordNormalizer implements IRichBolt {
private OutputCollector collector;
Map<String, Object> count;

public void cleanup() {}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public void execute(Tuple input) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
//Emit the word
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
// Acknowledge the tuple
collector.ack(input);
}

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

/**
* The bolt will only emit the field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

public Map<String, Object> getComponentConfiguration(){return count;}
}

bolt 计数器程序::

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;

public class WordCounter implements IRichBolt {
Integer id;
String name;
Map<String, Integer> counters;
Map<String, Object> count;
private OutputCollector collector;

/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup() {
System.out.println("-- Word Counter ["+name+"-"+id+"] --");
for(Map.Entry<String, Integer> entry : counters.entrySet()){
System.out.println(entry.getKey()+": "+entry.getValue());
}
}

/**
* On each word We will count
*/
@Override
public void execute(Tuple input) {
String str = input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
//Set the tuple as Acknowledge
collector.ack(input);
}

/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("str", "c"));
}

public void deactivate(){}

public Map<String, Object> getComponentConfiguration(){return count;}

}

请指导我们解决问题。

最佳答案

cluster.killTopology("WordCount");您只杀死拓扑的方法,集群将继续工作。随着cluster.shutdown();您可以通过该集群中正在运行的拓扑来杀死整个集群。

我不认为杀死拓扑是你的问题的原因。最好附加应用程序的日志。尝试检查这个treat 。可能是 Storm 使用的端口是随机使用的,这会导致您的问题。

关于java - 关闭和杀死拓扑方法的区别以及如何杀死拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21299513/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com