- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在学习 Apache Kafka-storm-cassandra 集成。我正在使用 Kafka Spout 从 Kafka 集群中读取 JSON 字符串。然后将其传递给解析 JSON 的 bolt ,并将所需的值发送到第二个 bolt ,后者将其写入 Cassandra DB。
但我遇到了这些错误。
java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '' expecting '''
java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at
backtype.storm.daemon.executor$fn__4722$fn__4734$fn__4781.invoke(executor.clj:748) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at
com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.execute(WordCounter.java:103) at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at
backtype.storm.daemon.executor$fn__4722$tuple_action_fn__4724.invoke(executor.clj:633) at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ... 6 more Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.Responses$Error.asException(Responses.java:101) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more
TableAlreadyExists 错误:
com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85)
com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at
com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at
com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.prepare(WordCounter.java:77) at backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:43) at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at
com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at
com.datastax.driver.core.Responses$Error.asException(Responses.java:105) at
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:70) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:38) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:168) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66) ... 21 more
我的主要拓扑:
public class TopologyQueryCounterMain {
static final Logger logger = Logger.getLogger(TopologyQueryCounterMain.class);
private static final String SPOUT_ID = "QueryCounter";
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
int numSpoutExecutors = 1;
logger.debug("This is SpoutConfig");
KafkaSpout kspout = QueryCounter();
TopologyBuilder builder = new TopologyBuilder();
logger.debug("This is Set Spout");
builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);
logger.debug("This is Set bolt");
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping(SPOUT_ID);
builder.setBolt("word-counter", new WordCounter(),1)
.shuffleGrouping("word-normalizer", "stream1");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
logger.debug("This is Submit cluster");
conf.put(Config.NIMBUS_HOST, "192.168.1.229");
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/home/ubuntu/workspace/QueryCounter/target/QueryCounter-0.0.1-SNAPSHOT.jar");
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
if (args != null && args.length > 0) {
StormSubmitter. submitTopology(args[0], conf, builder.createTopology());
}
else
{
cluster.submitTopology("QueryCounter", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("QueryCounter");
logger.debug("This is ShutDown cluster");
cluster.shutdown();
}
}
private static KafkaSpout QueryCounter() {
String zkHostPort = "localhost:2181";
String topic = "RandomQuery";
String zkRoot = "/QueryCounter";
String zkSpoutId = "QueryCounter-spout";
ZkHosts zkHosts = new ZkHosts(zkHostPort);
logger.debug("This is Inside kafka spout cluster");
SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
spoutCfg.scheme=new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
return kafkaSpout;
}
}
标准化 bolt :
public class WordNormalizer extends BaseBasicBolt {
static final Logger logger = Logger.getLogger(WordNormalizer.class);
public void cleanup() {}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String feed = input.getString(0);
String searchTerm = null;
String pageNo = null;
boolean sortOrder = true;
boolean category = true;
boolean field = true;
boolean filter = true;
String pc = null;
int ProductCount = 0;
String timestamp = null;
JSONObject obj = null;
try {
obj = new JSONObject(feed);
} catch (JSONException e1) {
// TODO Auto-generated catch block
//e1.printStackTrace();
}
try {
searchTerm = obj.getJSONObject("body").getString("correctedWord");
pageNo = obj.getJSONObject("body").getString("pageNo");
sortOrder = obj.getJSONObject("body").isNull("sortOrder");
category = obj.getJSONObject("body").isNull("category");
field = obj.getJSONObject("body").isNull("field");
filter = obj.getJSONObject("body").getJSONObject("filter").isNull("filters");
pc = obj.getJSONObject("body").getString("ProductCount").replaceAll("[^\\d]", "");
ProductCount = Integer.parseInt(pc);
timestamp = (obj.getJSONObject("envelope").get("timestamp")).toString().replaceAll("[^\\d]", "");
} catch (JSONException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
searchTerm = searchTerm.trim();
//Condition to eliminate pagination
if(!searchTerm.isEmpty()){
if ((pageNo.equals("1")) && (sortOrder == true) && (category == true) && (field == true) && (filter == true)){
searchTerm = searchTerm.toLowerCase();
System.out.println("In Normalizer term : "+searchTerm+","+timestamp+","+ProductCount);
System.out.println("Entire Json : "+feed);
collector.emit("stream1", new Values(searchTerm , timestamp , ProductCount ));
}
}
}
/**
* The bolt will only emit the field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("stream1", new Fields("searchTerm" ,"timestamp" ,"ProductCount"));
}
}
CassandraWriter bolt :
public class WordCounter extends BaseBasicBolt {
static final Logger logger = Logger.getLogger(WordCounter.class);
Integer id;
String name;
Map<String, Integer> counters;
Cluster cluster ;
Session session ;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup() {
}
public static Session getSessionWithRetry(Cluster cluster, String keyspace) {
while (true) {
try {
return cluster.connect(keyspace);
} catch (NoHostAvailableException e) {
Utils.sleep(1000);
}
}
}
public static Cluster setupCassandraClient() {
return Cluster.builder().addContactPoint("192.168.1.229").build();
}
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap<String, Integer>();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
cluster = setupCassandraClient();
session = WordCounter.getSessionWithRetry(cluster,"query");
String query = "CREATE TABLE IF NOT EXISTS ProductCount(uid uuid PRIMARY KEY, "
+ "term text , "
+ "ProductCount varint,"
+"timestamp text );";
session.executeAsync(query);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String term = input.getString(0);
String timestamp = input.getString(1);
int ProductCount = input.getInteger(2);
System.out.println("In Counter : " +term+","+ProductCount+","+timestamp);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
String insertIntoTable = "INSERT INTO ProductCount (uid, term, ProductCount, timestamp)"
+ " VALUES("+UUID.randomUUID()+","+"\'"+term+"\'"+","+ProductCount+","+"\'"+timestamp+"\'"+");" ;
session.executeAsync(insertIntoTable);
}
}
请建议我需要做的更改。
感谢 Advnace!!
最佳答案
错误指向您的一个查询中缺少 '
。如果您只是连接字符串,这通常是问题所在。为避免此类问题,您应该使用 prepared statements或者至少 Session.execute(query, params)
( javadoc )
关于cassandra - 将数据插入 Cassandra 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30842605/
我正在使用 node.js 和 mocha 单元测试,并且希望能够通过 npm 运行测试命令。当我在测试文件夹中运行 Mocha 测试时,测试运行成功。但是,当我运行 npm test 时,测试给出了
我的文本区域中有这些标签 ..... 我正在尝试使用 replaceAll() String 方法替换它们 text.replaceAll("", ""); text.replaceAll("", "
早上好,我是 ZXing 的新手,当我运行我的应用程序时出现以下错误: 异常Ljava/lang/NoClassDefFoundError;初始化 ICOM/google/zxing/client/a
我正在制作一些哈希函数。 它的源代码是... #include #include #include int m_hash(char *input, size_t in_length, char
我正在尝试使用 Spritekit 在 Swift 中编写游戏。目的是带着他的角色迎面而来的矩形逃跑。现在我在 SKPhysicsContactDelegate (didBegin ()) 方法中犯了
我正在尝试创建一个用于导入 CSV 文件的按钮,但出现此错误: actionPerformed(java.awt.event.ActionEvent) in cannot implement
请看下面的代码 public List getNames() { List names = new ArrayList(); try { createConnection(); Sta
我正在尝试添加一个事件以在“dealsArchive”表中创建一个条目,然后从“deals”表中删除该条目。它需要在特定时间执行。 这是我正在尝试使用的: DELIMITER $$ CREATE EV
我试图将两个存储过程的表结果存储到 phpmyadmin 例程窗口中的单个表中,这给了我 mariadb 语法错误。单独调用存储过程给出了结果。 存储过程代码 BEGIN CREATE TABLE t
我想在 videoview 中加载视频之前有一个进度条。但是我收到以下错误。我还添加了所有必要的导入。 我在 ANDROID 中使用 AIDE 这是我的代码 public class MainActi
我已经使用了 AsyncTask,但我不明白为什么在我的设备 (OS 4.0) 上测试时仍然出现错误。我的 apk 构建于 2.3.3 中。我想我把代码弄错了,但我不知道我的错误在哪里。任何人都请帮助
我在测试 friend 网站的安全性时,通过在 URL 末尾添加 ' 发现了 SQL 注入(inject)漏洞该网站是用zend框架构建的我遇到的问题是 MySQL -- 中的注释语法不起作用,因此页
我正在尝试使用堆栈溢出答案之一的交互式信息窗口。 链接如下: interactive infowindow 但是我在代码中使用 getMap() 时遇到错误。虽然我尝试使用 getMapAsync 但
当我编译以下代码时出现错误: The method addMouseListener(Player) is undefined for the type Player 代码: import java.
我是 Android 开发的初学者。我正在开发一个接收 MySql 数据然后将其保存在 SQLite 中的应用程序。 我将 Json 用于同步状态,以便我可以将未同步数据的数量显示为要同步的待处理数据
(这里是Hello world级别的自动化测试人员) 我正在尝试下载一个文件并将其重命名以便于查找。我收到一个错误....这是代码 @Test public void allDownload(
我只是在写另一个程序。并使用: while (cin) words.push_back(s); words是string的vector,s是string。 我的 RAM 使用量在 4 或 5
我是 AngularJS 的新手,我遇到了一个问题。我有一个带有提交按钮的页面,当我单击提交模式时必须打开并且来自 URL 的数据必须存在于模式中。现在,模式打开但它是空的并且没有从 URL 获取数据
我正在尝试读取一个文件(它可以包含任意数量的随机数字,但不会超过 500 个)并将其放入一个数组中。 稍后我将需要使用数组来做很多事情。 但到目前为止,这一小段代码给了我 no match for o
有些人在使用 make 命令进行编译时遇到了问题,所以我想我应该在这里尝试一下,我已经在以下操作系统的 ubuntu 32 位和挤压 64 位上尝试过 我克隆了 git 项目 https://gith
我是一名优秀的程序员,十分优秀!