gpt4 book ai didi

java - 我的 Storm 拓扑既不工作(不生成输出)也不失败(不生成错误或异常)

转载 作者:太空宇宙 更新时间:2023-11-04 13:39:14 25 4
gpt4 key购买 nike

我有一个拓扑,其中我试图计算由 SimulatorSpout (不是真正的流)生成的单词出现次数,然后写入 MySQL 数据库表,表方案非常简单:

Field  |  Type        |  ...
ID | int(11) | Auto_icr
word | varchar(50) |
count | int(11) |

但是我面临着奇怪的问题(正如我之前提到的)我成功地将拓扑提交到由 4 个主管组成的 Storm 集群,并且我可以在 Storm Web UI 中看到拓扑的流程(没有异常(exception))但是当我检查 MySQL 表时,令我惊讶的是,该表是空的......

欢迎任何意见、建议...

这里是喷嘴和 bolt :

public class MySQLConnection {

private static Connection conn = null;
private static String dbUrl = "jdbc:mysql://192.168.0.2:3306/test?";
private static String dbClass = "com.mysql.jdbc.Driver";

public static Connection getConnection() throws SQLException, ClassNotFoundException {

Class.forName(dbClass);
conn = DriverManager.getConnection(dbUrl, "root", "qwe123");
return conn;
}
}

============================== SentenceSpout ================================

public class SentenceSpout extends BaseRichSpout{

private static final long serialVersionUID = 1L;
private boolean _completed = false;

private SpoutOutputCollector _collector;
private String [] sentences = {
"Obama delivered a powerfull speech against USA",
"I like cold beverages",
"RT http://www.turkeyairline.com Turkish Airlines has delayed some flights",
"don't have a cow man...",
"i don't think i like fleas"
};

private int index = 0;


public void open (Map config, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}

public void nextTuple () {

_collector.emit(new Values(sentences[index]));
index++;
if (index >= sentences.length) {
index = 0;

Utils.waitForSeconds(1);
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}

public void ack(Object msgId) {
System.out.println("OK: " + msgId);
}

public void close() {}

public void fail(Object msgId) {
System.out.println("FAIL: " + msgId);
}
}

============================== SplitSentenceBolt ================================

public class SplitSentenceBolt extends BaseRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector _collector;

public void prepare (Map config, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

public void execute (Tuple tuple) {

String sentence = tuple.getStringByField("sentence");
String httpRegex = "((https?|ftp|telnet|gopher|file)):((//)|(\\\\))+[\\w\\d:#@%/;$()~_?\\+-=\\\\\\.&]*";
sentence = sentence.replaceAll(httpRegex, "").replaceAll("RT", "").replaceAll("[.|,]", "");
String[] words = sentence.split(" ");
for (String word : words) {
if (!word.isEmpty())
_collector.emit(new Values(word.trim()));
}
_collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

============================WordCountBolt==================================

public class WordCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private HashMap<String , Integer> counts = null;
private OutputCollector _collector;
private ResultSet resSet = null;
private Statement stmt = null;
private Connection _conn = null;

private String path = "/home/hduser/logOfStormTops/logger.txt";
String rLine = null;

public void prepare (Map config, TopologyContext context, OutputCollector collector) {
counts = new HashMap<String, Integer>();
_collector = collector;
}

public void execute (Tuple tuple) {

int insertResult = 0;
int updateResult = 0;

String word = tuple.getStringByField("word");
//----------------------------------------------------
if (!counts.containsKey(word)) {

counts.put(word, 1);
try {
insertResult = wordInsertIfNoExist(word);
if (insertResult == 1) {
_collector.ack(tuple);
} else {
_collector.fail(tuple);
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
} else {
//-----------------------------------------------
counts.put(word, counts.get(word) + 1);

try {
// writing to db
updateResult = updateCountOfExistingWord(word);
if (updateResult == 1) {
_collector.ack(tuple);
} else {
_collector.fail(tuple);
}
// Writing to file
BufferedWriter buffer = new BufferedWriter(new FileWriter(path));
buffer.write("[ " + word + " : " + counts.get("word") + " ]");
buffer.newLine();
buffer.flush();
buffer.close();

} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("{word-" + word + " : count-" + counts.get(word) + "}");
}

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

// *****************************************************

public int wordInsertIfNoExist(String word) throws ClassNotFoundException, SQLException {

String query = "SELECT word FROM wordcount WHERE word=\"" + word + "\"";
String insert = "INSERT INTO wordcount (word, count) VALUES (\"" + word + "\", 1)";
_conn = MySQLConnection.getConnection();
stmt = _conn.createStatement();
resSet = stmt.executeQuery(query);
int res = 0;
if (!resSet.next()) {

res = stmt.executeUpdate(insert);

} else {
System.out.println("Yangi qiymatni kirityotrganda nimadir sodir bo'ldi");
}
resSet.close();
stmt.close();
_conn.close();
return res;
}

public int updateCountOfExistingWord(String word) throws ClassNotFoundException, SQLException {

String update = "UPDATE wordcount SET count=count+1 WHERE word=\"" + word + "\"";
_conn = MySQLConnection.getConnection();
stmt = _conn.createStatement();
int result = stmt.executeUpdate(update);

//System.out.println(word + "'s count has been updated (incremented)");

resSet.close();
stmt.close();
_conn.close();
return result;
}
}

=========================WordCountTopology ===============================

public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String TOPOLOGY_NAME = "NewWordCountTopology";

@SuppressWarnings("static-access")
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);

builder.setBolt(SPLIT_BOLT_ID, splitBolt, 4).shuffleGrouping(SENTENCE_SPOUT_ID);

builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

Config config = new Config();
config.setMaxSpoutPending(100);
config.setDebug(true);

StormSubmitter submitter = new StormSubmitter();

submitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

}
}

最佳答案

这是因为抛出异常时没有调用_collector.ack(tuple)。当待处理的元组太多时,spout将停止发送新的元组。尝试抛出 RuntimeException 而不是 printStackTrace。

关于java - 我的 Storm 拓扑既不工作(不生成输出)也不失败(不生成错误或异常),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31378092/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com