gpt4 book ai didi

apache-storm - 使用 accumulo 运行 Storm 连接时出错

转载 作者:行者123 更新时间:2023-12-04 20:42:00 28 4
gpt4 key购买 nike

我有如下 Storm bolt ,

 package storm.bolt;

import java.util.Map;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;


import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class AccumuloBolt implements IRichBolt {

private static final long serialVersionUID = 1L;
private OutputCollector collector;
private ZooKeeperInstance instance;
private Connector connector;
private BatchWriter bw;
private Text colf;
private MultiTableBatchWriter mtbw;
private final String instanceName;
private final String zooServers;
private final String userName;
private final String password;
Map<String, Integer> counters;
/**
* @param zooServers The host on which Zookeeper is running.
* @param userName for which Accumula username.
* @param password The Acumula passowrd
* written to.
* String instanceName = "myistance";
* String zooServers = "192.168.1.81:2181";
* String userName = "root";
* String password = "aryadevi";
*/
public AccumuloBolt(String instanceName, String zooServers, String userName,
String password) {
this.instanceName = instanceName;
this.zooServers = zooServers;
this.userName = userName;
this.password = password;
}
public void prepare( Map stormConf, TopologyContext context, OutputCollector collector) {

this.collector = collector;
try {
//this.instance = new ZooKeeperInstance(instanceName, zooServers);
this.instance = new ZooKeeperInstance("myistance", "192.168.1.81:2181");
//this.connector= instance.getConnector(userName, password);
this.connector= instance.getConnector("root", "aryadevi");
this.mtbw=connector.createMultiTableBatchWriter(200000l, 300, 4);
this.bw=null;


} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void execute(Tuple input) {
if (shouldActOnInput(input)) {
try{
if (!this.connector.tableOperations().exists("new2"))
this.connector.tableOperations().create("new2");
this.bw = this.mtbw.getBatchWriter("new2");
this.colf=new Text("colfam");
System.out.println("writing ...");
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}


}catch (Exception e) {
throw new RuntimeException(e);
}


//DBObject updateObj = getDBObjectForInput(input);
//this.bw.addMutation(m);

} else {
collector.ack(input);
}
}
public void cleanup() {
try{

for(Map.Entry<String, Integer> entry : counters.entrySet()){
Mutation m = new Mutation(new Text(String.format("row_%d",entry.getKey() )));
m.put(this.colf, new Text(String.format("colqual_%d", entry.getKey())), new Value((String.format("value_%d", entry.getValue())).getBytes()));
System.out.println(entry.getKey()+": "+entry.getValue());
bw.addMutation(m);
}


this.mtbw.close();
}catch (Exception e) {
throw new RuntimeException(e);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub

}
public boolean shouldActOnInput(Tuple input) {
return true;
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}



}

我只是使用“ mvn comple ”编译这个 torm 并使用 mvn package 创建一个包
然后我只是使用以下推荐来运行 Storm
Storm jar 目标/storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jarstorm.TwitterStorm
运行此推荐后出现如下错误
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46217 [Thread-8-count] ERROR backtype.storm.util - Async loop died!
java.lang.ExceptionInInitializerError: null
at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
at org.apache.log4j.Logger.getLogger(Logger.java:43) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
at org.apache.accumulo.core.client.ZooKeeperInstance.<clinit>(ZooKeeperInstance.java:63) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
... 8 common frames omitted
46218 [Thread-10-count] ERROR backtype.storm.daemon.executor -
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46218 [Thread-8-count] ERROR backtype.storm.daemon.executor -
java.lang.ExceptionInInitializerError: null
at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
at org.apache.log4j.Logger.getLogger(Logger.java:43) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
at org.apache.accumulo.core.client.ZooKeeperInstance.<clinit>(ZooKeeperInstance.java:63) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
... 8 common frames omitted
46218 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor count:[4 4]
46219 [Thread-6] INFO backtype.storm.daemon.task - Emitting: count __system ["startup"]
46220 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks count:[4 4]
46224 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor count:[4 4]
46224 [Thread-12-count] INFO backtype.storm.daemon.executor - Preparing bolt count:(4)
46225 [Thread-12-count] ERROR backtype.storm.util - Async loop died!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46226 [Thread-12-count] ERROR backtype.storm.daemon.executor -
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46321 [Thread-10-count] INFO backtype.storm.util - Halting process: ("Worker died")
46321 [Thread-8-count] INFO backtype.storm.util - Halting process: ("Worker died")

最佳答案

看起来这个 Storm 票有一个相关的讨论:
https://issues.apache.org/jira/browse/STORM-122

我认为 Accumulo 有一个 slf4j-log4j12 依赖项,而 Storm 使用的是不兼容的 log4j-over-slf4j。该讨论似乎建议从 Accumulo 依赖项中排除日志依赖项,如 slf4j-log4j12 和 log4j。我不知道这是否会奏效,但值得一试。

关于apache-storm - 使用 accumulo 运行 Storm 连接时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24603227/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com