Ошибка при вставке данных в Cassandra
Я изучаю Apache Kafka-шторм-Кассандра интеграции. Я читаю строку JSON из кластера Kafka, используя Kafka Spout. Затем передаю ее болту, который анализирует JSON и выдает необходимое значение во второй болт, который записывает его в базу данных Cassandra.
Но я получаю эти ошибки.
java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: строка 1:154 несоответствующий символ "ожидание"
java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at
backtype.storm.daemon.executor$fn__4722$fn__4734$fn__4781.invoke(executor.clj:748) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at
com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.execute(WordCounter.java:103) at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at
backtype.storm.daemon.executor$fn__4722$tuple_action_fn__4724.invoke(executor.clj:633) at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ... 6 more Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.Responses$Error.asException(Responses.java:101) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more
Ошибка TableAlreadyExists:
com.datastax.driver.core.exceptions.AlreadyExistsException: таблица query.productcount уже существует в com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85)
com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at
com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at
com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.prepare(WordCounter.java:77) at backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:43) at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at
com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at
com.datastax.driver.core.Responses$Error.asException(Responses.java:105) at
com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:70) at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:38) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:168) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66) ... 21 more
Моя основная топология:
public class TopologyQueryCounterMain {
static final Logger logger = Logger.getLogger(TopologyQueryCounterMain.class);
private static final String SPOUT_ID = "QueryCounter";
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
int numSpoutExecutors = 1;
logger.debug("This is SpoutConfig");
KafkaSpout kspout = QueryCounter();
TopologyBuilder builder = new TopologyBuilder();
logger.debug("This is Set Spout");
builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);
logger.debug("This is Set bolt");
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping(SPOUT_ID);
builder.setBolt("word-counter", new WordCounter(),1)
.shuffleGrouping("word-normalizer", "stream1");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
logger.debug("This is Submit cluster");
conf.put(Config.NIMBUS_HOST, "192.168.1.229");
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/home/ubuntu/workspace/QueryCounter/target/QueryCounter-0.0.1-SNAPSHOT.jar");
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
if (args != null && args.length > 0) {
StormSubmitter. submitTopology(args[0], conf, builder.createTopology());
}
else
{
cluster.submitTopology("QueryCounter", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("QueryCounter");
logger.debug("This is ShutDown cluster");
cluster.shutdown();
}
}
private static KafkaSpout QueryCounter() {
String zkHostPort = "localhost:2181";
String topic = "RandomQuery";
String zkRoot = "/QueryCounter";
String zkSpoutId = "QueryCounter-spout";
ZkHosts zkHosts = new ZkHosts(zkHostPort);
logger.debug("This is Inside kafka spout cluster");
SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
spoutCfg.scheme=new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
return kafkaSpout;
}
}
Болт нормализатора:
public class WordNormalizer extends BaseBasicBolt {
static final Logger logger = Logger.getLogger(WordNormalizer.class);
public void cleanup() {}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String feed = input.getString(0);
String searchTerm = null;
String pageNo = null;
boolean sortOrder = true;
boolean category = true;
boolean field = true;
boolean filter = true;
String pc = null;
int ProductCount = 0;
String timestamp = null;
JSONObject obj = null;
try {
obj = new JSONObject(feed);
} catch (JSONException e1) {
// TODO Auto-generated catch block
//e1.printStackTrace();
}
try {
searchTerm = obj.getJSONObject("body").getString("correctedWord");
pageNo = obj.getJSONObject("body").getString("pageNo");
sortOrder = obj.getJSONObject("body").isNull("sortOrder");
category = obj.getJSONObject("body").isNull("category");
field = obj.getJSONObject("body").isNull("field");
filter = obj.getJSONObject("body").getJSONObject("filter").isNull("filters");
pc = obj.getJSONObject("body").getString("ProductCount").replaceAll("[^\\d]", "");
ProductCount = Integer.parseInt(pc);
timestamp = (obj.getJSONObject("envelope").get("timestamp")).toString().replaceAll("[^\\d]", "");
} catch (JSONException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
searchTerm = searchTerm.trim();
//Condition to eliminate pagination
if(!searchTerm.isEmpty()){
if ((pageNo.equals("1")) && (sortOrder == true) && (category == true) && (field == true) && (filter == true)){
searchTerm = searchTerm.toLowerCase();
System.out.println("In Normalizer term : "+searchTerm+","+timestamp+","+ProductCount);
System.out.println("Entire Json : "+feed);
collector.emit("stream1", new Values(searchTerm , timestamp , ProductCount ));
}
}
}
/**
* The bolt will only emit the field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("stream1", new Fields("searchTerm" ,"timestamp" ,"ProductCount"));
}
}
CassandraWriter болт:
public class WordCounter extends BaseBasicBolt {
static final Logger logger = Logger.getLogger(WordCounter.class);
Integer id;
String name;
Map<String, Integer> counters;
Cluster cluster ;
Session session ;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup() {
}
public static Session getSessionWithRetry(Cluster cluster, String keyspace) {
while (true) {
try {
return cluster.connect(keyspace);
} catch (NoHostAvailableException e) {
Utils.sleep(1000);
}
}
}
public static Cluster setupCassandraClient() {
return Cluster.builder().addContactPoint("192.168.1.229").build();
}
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap<String, Integer>();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
cluster = setupCassandraClient();
session = WordCounter.getSessionWithRetry(cluster,"query");
String query = "CREATE TABLE IF NOT EXISTS ProductCount(uid uuid PRIMARY KEY, "
+ "term text , "
+ "ProductCount varint,"
+"timestamp text );";
session.executeAsync(query);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String term = input.getString(0);
String timestamp = input.getString(1);
int ProductCount = input.getInteger(2);
System.out.println("In Counter : " +term+","+ProductCount+","+timestamp);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
String insertIntoTable = "INSERT INTO ProductCount (uid, term, ProductCount, timestamp)"
+ " VALUES("+UUID.randomUUID()+","+"\'"+term+"\'"+","+ProductCount+","+"\'"+timestamp+"\'"+");" ;
session.executeAsync(insertIntoTable);
}
}
Пожалуйста, предложите мне изменения, которые мне нужно сделать.
Спасибо в Advnace!
1 ответ
Ошибка указывает на отсутствие '
в одном из ваших запросов. Обычно это проблема, если вы просто объединяете строки. Чтобы избежать подобных проблем, вы должны использовать подготовленные заявления или, по крайней мере, Session.execute(query, params)
( Javadoc)