- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在运行 Storm 拓扑时,我收到此错误。拓扑完美运行 5 分钟,没有任何错误,然后失败。我正在使用
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS as 300 sec i.e 5mins.
这是我的输入流:
{"_id":{"$oid":"556809dbe4b0ef41436f7515"},"body":{"ProductCount":NumberInt(1),"category":null,"CorrectedWord":"bbtp", "field":null,"filter":{},"fromAutocomplete":false,"loggedIn":false,"pageNo":"1","pageSize":"64","percentageMatch":NumberInt(100), "searchTerm":"bbtp","sortOrder":null,"suggestedWords":[]},"envelope":{"IP":"115.115.115.98","actionType":"search","sessionId":"10536088910863418864","时间戳":{"$date":"2015-05-29T06:40:00.000Z"}}}
这是完整的错误:
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long
cannot be cast to java.lang.String at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at
backtype.storm.daemon.executor$fn__4722$fn__4734$fn__4781.invoke(executor.clj:748) at
backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to
java.lang.String at
backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) at
com.inferlytics.InferlyticsStormConsumer.bolt.QueryNormalizer.execute(QueryNor
malizer.java:40) at
backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at
backtype.storm.daemon.executor$fn__4722$tuple_action_fn__4724.invoke(executor.clj:633) at
backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:4
04) at
backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ... 6 more
我的拓扑:
public class TopologyMain {
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory
.getLogger(TopologyMain.class);
private static final String SPOUT_ID = "Feed-Emitter";
/**
* @param args
*/
/**
* @param args
* @throws AlreadyAliveException
* @throws InvalidTopologyException
*/
/**
* @param args
* @throws AlreadyAliveException
* @throws InvalidTopologyException
*/
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
int numSpoutExecutors = 1;
LOG.info("This is SpoutConfig");
KafkaSpout kspout = QueryCounter();
TopologyBuilder builder = new TopologyBuilder();
LOG.info("This is Set Spout");
builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);
LOG.info("This is Query-Normalizer bolt");
builder.setBolt("Query-normalizer", new QueryNormalizer())
.shuffleGrouping(SPOUT_ID);
LOG.info("This is Query-ProductCount bolt");
builder.setBolt("Query-ProductCount", new QueryProductCount(),1)
.shuffleGrouping("Query-normalizer", "stream1");
LOG.info("This is Query-SearchTerm bolt");
builder.setBolt("Query-SearchTerm", new QuerySearchTermCount(),1)
.shuffleGrouping("Query-normalizer", "stream2");
LOG.info("This is tick-tuple bolt");
builder.setBolt("Tick-Tuple", new TickTuple(),1)
.shuffleGrouping("Query-normalizer", "stream3");
/*
* Storm Constants
* */
String NIMBUS_HOST = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
ApplicationConstants.NIMBUS_HOST );
String NIMBUS_THRIFT_PORT = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
ApplicationConstants.NIMBUS_THRIFT_PORT );
String TOPOLOGY_TICK_TUPLE_FREQ_SECS = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
ApplicationConstants.TOPOLOGY_TICK_TUPLE_FREQ_SECS );
String STORM_JAR = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
ApplicationConstants.STORM_JAR );
String SET_NUM_WORKERS = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
ApplicationConstants.SET_NUM_WORKERS );
String SET_MAX_SPOUT_PENDING = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
ApplicationConstants.SET_MAX_SPOUT_PENDING );
final int setNumWorkers = Integer.parseInt(SET_NUM_WORKERS);
final int setMaxSpoutPending = Integer.parseInt(SET_MAX_SPOUT_PENDING);
final int nimbus_thirft_port = Integer.parseInt(NIMBUS_THRIFT_PORT);
final int topology_tick_tuple_freq_secs = Integer.parseInt(TOPOLOGY_TICK_TUPLE_FREQ_SECS);
/*
* Storm Configurations
*/
LOG.trace("Setting Configuration");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
conf.put(Config.NIMBUS_HOST, NIMBUS_HOST);
conf.put(Config.NIMBUS_THRIFT_PORT, nimbus_thirft_port);
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, topology_tick_tuple_freq_secs);
System.setProperty("storm.jar",STORM_JAR );
conf.setNumWorkers(setNumWorkers);
conf.setMaxSpoutPending(setMaxSpoutPending);
if (args != null && args.length > 0) {
LOG.trace("Storm Topology Submitted On CLuster");
StormSubmitter. submitTopology(args[0], conf, builder.createTopology());
}
else
{
LOG.trace("Storm Topology Submitted On Local");
cluster.submitTopology("Query", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("Query");
LOG.trace("This is ShutDown cluster");
cluster.shutdown();
}
LOG.trace("Method: main finished.");
}
private static KafkaSpout QueryCounter() {
//Build a kafka spout
/*
* Kafka Constants
*/
final String topic = FilePropertyManager.getProperty( ApplicationConstants.KAFKA_CONSTANTS_FILE,
ApplicationConstants.TOPIC );
String zkHostPort = FilePropertyManager.getProperty( ApplicationConstants.KAFKA_CONSTANTS_FILE,
ApplicationConstants.ZOOKEEPER_CONNECTION_STRING );
String zkRoot = "/Feed-Emitter";
String zkSpoutId = "Feed-Emitter-spout";
ZkHosts zkHosts = new ZkHosts(zkHostPort);
LOG.trace("This is Inside kafka spout ");
SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
spoutCfg.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
LOG.trace("Returning From kafka spout ");
return kafkaSpout;
}
}
我的 QueryNormalizer Bolt:
public class QueryNormalizer extends BaseBasicBolt {
/**
*
*/
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory
.getLogger(QueryNormalizer.class);
public void cleanup() {}
/**
* The bolt will receive the line from the
* feed file and process it to Normalize this line
*
* The normalize will be put the terms in lower case
* and split the line to get all terms.
*/
public void execute(Tuple input, BasicOutputCollector collector) {
LOG.trace("Method in QueryNormalizer: execute called.");
String feed = input.getString(0);
String searchTerm = null;
String pageNo = null;
boolean sortOrder = true;
boolean category = true;
boolean field = true;
boolean filter = true;
String pc = null;
int ProductCount = 0;
String timestamp = null;
String year = null;
String month = null;
String day = null;
String hour = null;
Calendar calendar = Calendar.getInstance();
int dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
int weekOfYear = calendar.get(Calendar.WEEK_OF_YEAR);
JSONObject obj = null;
try {
obj = new JSONObject(feed);
} catch (JSONException e1) {
LOG.error( "Json Exception in Query Normalizer", e1 );
}
try {
searchTerm = obj.getJSONObject("body").getString("correctedWord");
pageNo = obj.getJSONObject("body").getString("pageNo");
sortOrder = obj.getJSONObject("body").isNull("sortOrder");
category = obj.getJSONObject("body").isNull("category");
field = obj.getJSONObject("body").isNull("field");
filter = obj.getJSONObject("body").getJSONObject("filter").isNull("filters");
pc = obj.getJSONObject("body").getString("ProductCount").replaceAll("[^\\d]", "");
ProductCount = Integer.parseInt(pc);
timestamp = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(10,29);
year = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(10, 14);
month = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(15, 17);
day = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(18, 20);
hour = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(21, 23);
} catch (JSONException e) {
LOG.error( "Parsing Value Exception in Query Normalizer", e );
}
searchTerm = searchTerm.trim();
//Condition to eliminate pagination
if(!searchTerm.isEmpty()){
if ((pageNo.equals("1")) && (sortOrder == true) && (category == true) && (field == true) && (filter == true)){
searchTerm = searchTerm.toLowerCase();
System.out.println("In QueryProductCount execute: "+searchTerm+","+year+","+month+","+day+","+hour+","+dayOfYear+","+weekOfYear+","+ProductCount);
System.out.println("Entire Json : "+feed);
System.out.println("In QuerySearchCount execute : "+searchTerm+","+year+","+month+","+day+","+hour);
LOG.trace("In QueryNormalizer execute : "+searchTerm+","+year+","+month+","+day+","+hour+","+dayOfYear+","+weekOfYear+","+ProductCount);
LOG.trace("In QueryNormalizer execute : "+searchTerm+","+year+","+month+","+day+","+hour);
collector.emit("stream1", new Values(searchTerm , year , month , day , hour , dayOfYear , weekOfYear , ProductCount ));
collector.emit("stream2", new Values(searchTerm , year , month , day , hour ));
collector.emit("stream3", new Values());
}LOG.trace("Method in QueryNormalizer: execute finished.");
}
}
/**
* The bolt will only emit the specified streams in collector
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("stream1", new Fields("searchTerm" ,"year" ,"month" ,"day" ,"hour" ,"dayOfYear" ,"weekOfYear" ,"ProductCount"));
declarer.declareStream("stream2", new Fields("searchTerm" ,"year" ,"month" ,"day" ,"hour"));
declarer.declareStream("stream3", new Fields());
}
}
在QueryNormalizer类中,错误显示在这一行
字符串 feed = input.getString(0);
public void execute(Tuple input, BasicOutputCollector collector) {
LOG.trace("Method in QueryNormalizer: execute called.");
String feed = input.getString(0);
String searchTerm = null;
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) at com.inferlytics.InferlyticsStormConsumer.bolt.QueryNormalizer.execute(QueryNor malizer.java:40)
编辑:
从配置中删除 Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS
后,代码可以正常工作。但我必须实现 Tick Tuple () 。如何实现?
我猜我的 TickTuple 类有问题。这是实现它的正确方法吗?
TickTuple
public class TickTuple extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory
.getLogger(TickTuple.class);
private static final String KEYSPACE = FilePropertyManager.getProperty( ApplicationConstants.CASSANDRA_CONSTANTS_FILE,
ApplicationConstants.KEYSPACE );
private static final String MONGO_DB = FilePropertyManager.getProperty( ApplicationConstants.MONGO_CONSTANTS_FILE,
ApplicationConstants.MONGO_DBE );
private static final String TABLE_CASSANDRA_TOP_QUERY = FilePropertyManager.getProperty( ApplicationConstants.CASSANDRA_CONSTANTS_FILE,
ApplicationConstants.TABLE_CASSANDRA_TOP_QUERY );
private static final String MONGO_COLLECTION_E = FilePropertyManager.getProperty( ApplicationConstants.MONGO_CONSTANTS_FILE,
ApplicationConstants.MONGO_COLLECTION_E );
public void cleanup() {
}
protected static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
if (isTickTuple(input)) {
CassExport.cassExp(KEYSPACE, TABLE_CASSANDRA_TOP_QUERY, MONGO_DB, MONGO_COLLECTION_E);
TruncateCassandraTable.truncateData(TABLE_CASSANDRA_TOP_QUERY);
Log.trace("In Truncate");
return;
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
任何人都可以建议代码中所需的更改吗?
最佳答案
现在我明白了:您在同一输入流中拥有数据元组和刻度元组。因此,对于数据元组,第一个字段的类型为 String,但对于刻度元组,它的类型为 Long。因此,input.getString(0) 在 ClassCastException 中运行,以获取第一个到达的刻度元组。
您需要像这样更新您的 bolt 代码:
Object field1 = input.getValue(0);
if (field1 instanceof Long) {
Long tick = (Long)field1;
// process tick tuple further
} else {
String feed = (String)field1;
// process data tuple as you did already
}
关于java.lang.RuntimeException : java. lang.ClassCastException : java. lang.Long 无法转换为 java.lang.String,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31139119/
我正在编写一个 mapreduce 应用程序,它接受(键,值)格式的输入并只显示与 reducer 输出相同的数据。 这是示例输入: 1500s 1 1960s 1 Aldus 1 在下面
我不明白,我有一个典型的消息源 content.Language 我可以得到它就像 @Autowire protec
我已经为抽屉导航编写了一个运行良好的程序,但最近我尝试为个人资料图片和 TextView 放置一个 ImageView,之后它给了我一个 ClassCastException。 main_activi
这个问题不太可能帮助任何 future 的访问者;它只与一个小的地理区域、一个特定的时间点或一个非常狭窄的情况有关,这些情况并不普遍适用于互联网的全局受众。为了帮助使这个问题更广泛地适用,visit
@Override public void onPause() { super.onPause(); save(notes.itemSelected); } @Override pub
描述 我正在尝试创建一种自定义语言,我想将词法分析器规则与解析器规则分开。此外,我的目标是将词法分析器和解析器规则进一步划分为特定文件(例如,通用词法分析器规则和关键字规则)。 但我似乎无法让它发挥作
我正在尝试使用以下代码为给定的 Runnable 对象创建代理: public class WorkInvocationHandler implements InvocationHandler {
我有两个非常简单的类,一个扩展了另一个: public class LocationType implements Parcelable { protected int locid =
我遇到了 ClassCastException :Cannot cast my.package.classA to my.package.classA.请注意,(规范)类(名称)是相同的。 我知道这应
我有一个代码试图将一个函数包装在另一个执行动态类型检查的函数中: class Base class Foo extends Base class Bar extends Base object Mai
我使用hadoop 0.18.3遇到以下错误 java.lang.ClassCastException:org.apache.hadoop.io.Text无法转换为org.apache.hadoop.
在 org.dozer.BeanFactory.createBean(Object, Class, String) 的实现中我尝试将对象转换为它的类型。如果我部署所有 bundle ,即关闭并启动所有
我有这个代码: package Maven_test.Maven_project_test; public class Test { class A { int i = 10;
我一直在尝试对 Wicket 的 WebSession 进行子类化,以便可以实现基本的身份验证系统。我已遵循 Wicket 引用库中的指南。当我在网页中尝试以下操作时,出现 ClassCastExce
我正在构建一个 kotlin AAR 库,我需要在发布之前对其进行混淆。我有以下结构: package com.example.token interface TokenManager { s
Kotlin 引入了 here 中描述的声明站点差异. 在某些情况下,泛型参数的 out/in 关键字可能会导致 ClassCastException。我的程序如下所示。 fun main(args:
我正在 AnyLogic 中进行基于代理的传染病流行模拟。我的模型中有两种代理类型 - 人员和建筑物。我正在尝试编写一个函数来计算代理类型“人员”在任何给定时间点所具有的传染性接触数量。下面是我的函数
我有一个 EditContact 类。打开后,它会显示哪些复选框已选中或未选中。这是通过我的适配器中的一些代码完成的,它可以正常工作: //This is for EditContact, t
这个问题已经有答案了: 奥 git _a (2 个回答) 已关闭 5 年前。 我正在获取 ClassCastException 。这两个类来自不同的 jar,但是JettyContinuationPr
我想在 Java 中使用一组对,但是当我调用 contains() 来查看它是否已包含特定对时,我总是会得到 ClassCastException >。有没有办法避免这种行为? 它的实例化如下: pr
我是一名优秀的程序员,十分优秀!