将数据插入 Cassandra 时出错

Error While Inserting Data into Cassandra

我正在学习 Apache Kafka-storm-cassandra 集成。我正在使用 Kafka Spout.Then 从 Kafka 集群读取 JSON 字符串,将其传递给解析 JSON 的螺栓,并将所需的值发送到第二个螺栓,后者将其写入 Cassandra DB。

但我遇到了这些错误。

java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '' expecting '''

java.lang.RuntimeException: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at 
backtype.storm.daemon.executor$fn__4722$fn__4734$fn__4781.invoke(executor.clj:748) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) 
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:35) at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at 
com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.execute(WordCounter.java:103) at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at 
backtype.storm.daemon.executor$fn__4722$tuple_action_fn__4724.invoke(executor.clj:633) at backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:401) at backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:120) ... 6 more Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 mismatched character '<EOF>' expecting ''' at com.datastax.driver.core.Responses$Error.asException(Responses.java:101) at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at 
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at 
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at 
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at 
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at 
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at 
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more

TableAlreadyExists 错误:

com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85)

com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at 
com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289) at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205) at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52) at 
com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36) at bolts.WordCounter.prepare(WordCounter.java:77) at backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:43) at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at 
com.datastax.driver.core.exceptions.AlreadyExistsException.copy(AlreadyExistsException.java:85) at 
com.datastax.driver.core.Responses$Error.asException(Responses.java:105) at 

com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140) at 
com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:293) at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:455) at 
com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:734) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more Caused by: com.datastax.driver.core.exceptions.AlreadyExistsException: Table query.productcount already exists at com.datastax.driver.core.Responses$Error.decode(Responses.java:70) at com.datastax.driver.core.Responses$Error.decode(Responses.java:38) at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:168) at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66) ... 21 more

我的主要拓扑:

public class TopologyQueryCounterMain {


static final Logger logger = Logger.getLogger(TopologyQueryCounterMain.class);


private static final String SPOUT_ID = "QueryCounter";


public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

    int numSpoutExecutors = 1;
    logger.debug("This is SpoutConfig");
    KafkaSpout kspout = QueryCounter();
    TopologyBuilder builder = new TopologyBuilder();
    logger.debug("This is Set Spout");
    builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);
    logger.debug("This is Set bolt");
    builder.setBolt("word-normalizer", new WordNormalizer())
        .shuffleGrouping(SPOUT_ID);
    builder.setBolt("word-counter", new WordCounter(),1)
        .shuffleGrouping("word-normalizer", "stream1");


    Config conf = new Config();
    LocalCluster cluster = new LocalCluster();
    logger.debug("This is Submit cluster");
    conf.put(Config.NIMBUS_HOST, "192.168.1.229");
    conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
     System.setProperty("storm.jar", "/home/ubuntu/workspace/QueryCounter/target/QueryCounter-0.0.1-SNAPSHOT.jar");
    conf.setNumWorkers(20);
    conf.setMaxSpoutPending(5000);

    if (args != null && args.length > 0) {
        StormSubmitter. submitTopology(args[0], conf, builder.createTopology());
    }

    else
    {   
        cluster.submitTopology("QueryCounter", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("QueryCounter");
        logger.debug("This is ShutDown cluster");
        cluster.shutdown();
    }
}


private static KafkaSpout QueryCounter() {
    String zkHostPort = "localhost:2181";
    String topic = "RandomQuery";

    String zkRoot = "/QueryCounter";
    String zkSpoutId = "QueryCounter-spout";
    ZkHosts zkHosts = new ZkHosts(zkHostPort);

    logger.debug("This is Inside kafka spout cluster");
    SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
    spoutCfg.scheme=new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
    return kafkaSpout;
  }

}

标准化螺栓:

public class WordNormalizer extends BaseBasicBolt {
static final Logger logger = Logger.getLogger(WordNormalizer.class);
public void cleanup() {}

/**
 * The bolt will receive the line from the
 * words file and process it to Normalize this line
 * 
 * The normalize will be put the words in lower case
 * and split the line to get all words in this 
 */
public void execute(Tuple input, BasicOutputCollector collector) {
String feed = input.getString(0);

    String searchTerm = null;
    String pageNo = null;
    boolean sortOrder = true;
    boolean category = true;
    boolean field = true;
    boolean filter = true;
    String pc = null;
    int ProductCount = 0;
    String timestamp = null;

    JSONObject obj = null;
    try {
        obj = new JSONObject(feed);
    } catch (JSONException e1) {
        // TODO Auto-generated catch block
        //e1.printStackTrace();

    }

    try {
           searchTerm = obj.getJSONObject("body").getString("correctedWord");

           pageNo = obj.getJSONObject("body").getString("pageNo");
           sortOrder = obj.getJSONObject("body").isNull("sortOrder");
           category = obj.getJSONObject("body").isNull("category");
           field = obj.getJSONObject("body").isNull("field");
           filter = obj.getJSONObject("body").getJSONObject("filter").isNull("filters");
           pc = obj.getJSONObject("body").getString("ProductCount").replaceAll("[^\d]", "");
           ProductCount = Integer.parseInt(pc);
           timestamp = (obj.getJSONObject("envelope").get("timestamp")).toString().replaceAll("[^\d]", "");
    } catch (JSONException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();

    }

    searchTerm = searchTerm.trim();

    //Condition to eliminate pagination
     if(!searchTerm.isEmpty()){
         if ((pageNo.equals("1")) && (sortOrder == true) && (category == true) && (field == true) && (filter == true)){
             searchTerm = searchTerm.toLowerCase();

            System.out.println("In Normalizer term : "+searchTerm+","+timestamp+","+ProductCount);
            System.out.println("Entire Json : "+feed);

             collector.emit("stream1", new Values(searchTerm , timestamp , ProductCount ));

            }
     }


    }



/**
 * The bolt will only emit the field "word" 
 */
public void declareOutputFields(OutputFieldsDeclarer declarer) {

    declarer.declareStream("stream1", new Fields("searchTerm" ,"timestamp" ,"ProductCount"));

}
}

CassandraWriter 螺栓:

public class WordCounter extends BaseBasicBolt {
static final Logger logger = Logger.getLogger(WordCounter.class);
Integer id;
String name;
Map<String, Integer> counters;
Cluster cluster ;
Session session ;

/**
 * At the end of the spout (when the cluster is shutdown
 * We will show the word counters
 */
@Override
public void cleanup() {

}
 public static Session getSessionWithRetry(Cluster cluster, String keyspace) {
        while (true) {
            try {
                return cluster.connect(keyspace);
            } catch (NoHostAvailableException e) {

                Utils.sleep(1000);
            }
        }

    }
public static Cluster setupCassandraClient() {
    return Cluster.builder().addContactPoint("192.168.1.229").build();
}
/**
 * On create 
 */
@Override
public void prepare(Map stormConf, TopologyContext context) {
    this.counters = new HashMap<String, Integer>();
    this.name = context.getThisComponentId();
    this.id = context.getThisTaskId();
    cluster = setupCassandraClient();
    session = WordCounter.getSessionWithRetry(cluster,"query");

    String query = "CREATE TABLE IF NOT EXISTS ProductCount(uid uuid PRIMARY KEY, "
            + "term text , "
            + "ProductCount varint," 
            +"timestamp text );";

    session.executeAsync(query);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}


@Override
public void execute(Tuple input, BasicOutputCollector collector) {
    String term = input.getString(0);
    String timestamp = input.getString(1);
    int ProductCount = input.getInteger(2);

    System.out.println("In Counter : " +term+","+ProductCount+","+timestamp);
    /**
     * If the word dosn't exist in the map we will create
     * this, if not We will add 1 
     */



String insertIntoTable = "INSERT INTO ProductCount (uid, term, ProductCount, timestamp)"

     + " VALUES("+UUID.randomUUID()+","+"\'"+term+"\'"+","+ProductCount+","+"\'"+timestamp+"\'"+");" ;
    session.executeAsync(insertIntoTable);



}
}

请建议我需要做的更改。

感谢 Advnace !!

错误指向您的一个查询中缺少 '。如果您只是连接字符串,这通常是问题所在。为避免此类问题,您应该使用 prepared statements or at least Session.execute(query, params) (javadoc)