Storm 拓扑上的 Redis 错误

Redis Error On a Storm topology

我正在使用 Redis 从我的 Storm 拓扑 中提取数据,我一直在阅读它,并找到了一些示例。

现在,我正在尝试运行,稍后我将实现一个从redis读取的前端并进行d3.js实现在将数据放在图表上的 html 文件上。我现在已经为本地文件实现了 D3 部分。目前,我没有创建用于表示我的数据的数学函数,我只是创建了从文本文件中读取名称并附加“:) :)”符号的元组。那部分工作也很好,但我在尝试在 Storm 0.10.0.

上使用 storm-redis 时遇到问题

暂时理解为(如有错误请指正) redis是mongoDB、非SQL等数据库使用字段来检索密钥。我有一个 implemerntation,但它不工作。我在一行中编译出错,它被注释掉了。这是我的代码:

package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.Map;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import redis.clients.jedis.JedisCommands;

/**
 * This is a basic example of a Storm topology.
 */
public class ProvaTopology {

  public static class ProvaBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "  :-)"));
      _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("morts"));
    }


  }
  public class ProvaSpout extends BaseRichSpout {
      SpoutOutputCollector _collector;
      //Random _rand;
      private String fileName;
      //private SpoutOutputCollector _collector;
      private BufferedReader reader;
      private AtomicLong linesRead;

      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        try {
            fileName= (String)"/home/prova.tsv";
            reader = new BufferedReader(new FileReader(fileName));
            // read and ignore the header if one exists
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
       // _rand = new Random();
      }

      public void nextTuple() {
        Utils.sleep(100);


      try {
            String line = reader.readLine();
            if (line != null) {
              long id = linesRead.incrementAndGet();
              _collector.emit(new Values(line), id);
            } else {
              System.out.println("Finished reading file, " + linesRead.get() + " lines read");
              Thread.sleep(10000);
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
      }
        /*String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
            "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        int _rand;*/
        //String sentence = sentences[_rand.nextInt(sentences.length)];
        //_collector.emit(new Values(sentence));


      public void ack(Object id) {
      }

      public void fail(Object id) {
      }

      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("morts"));
      }

    }


  class MortsStoreMapper implements RedisStoreMapper {
        private RedisDataTypeDescription description;
        private final String hashKey = "Morts";

        public void MortsStoreStoreMapper() {
            description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
        }


        public RedisDataTypeDescription getDataTypeDescription() {
            return description;
        }


        public String getKeyFromTuple(ITuple tuple) {
            return tuple.getStringByField("morts");
        }


        public String getValueFromTuple(ITuple tuple) {
            return tuple.getStringByField("somriures");
        }
    }

    public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
            .setHost("127.0.0.1").setPort(666).build();
    RedisStoreMapper storeMapper = new MortsStoreMapper();**//ERROR HERE** Non enclosing instance of type ProvaTopology is accessible. Must qualify the allocation with an enclosing instance of type ProvaTopology.
    RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

    builder.setSpout("morts", new TestWordSpout(), 10);//emisor
    builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("morts");// de on llig?
    builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?

    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                                   //WithProgressBar
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}

尽管存在此错误,但一旦解决,我不确定我的拓扑是否会保存任何内容,而且一旦保存,我对如何使用前端检索它有点迷茫。任何帮助将不胜感激。

提前致谢

这是我在网上找到的一个 jedis 实现,在 Storm-redis 教程中,以及我正在使用的那个,以防万一:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.redis.common.config;

import redis.clients.jedis.Protocol;

import java.io.Serializable;

/**
 * Configuration for JedisPool.
 */
public class JedisPoolConfig implements Serializable {
    public static final String DEFAULT_HOST = "127.0.0.1";

    private String host;
    private int port;
    private int timeout;
    private int database;
    private String password;

    /**
     * Constructor
     * <p/>
     * You can use JedisPoolConfig.Builder() for leaving some fields to apply default value.
     *
     * @param host hostname or IP
     * @param port port
     * @param timeout socket / connection timeout
     * @param database database index
     * @param password password, if any
     */
    public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
        this.host = host;
        this.port = port;
        this.timeout = timeout;
        this.database = database;
        this.password = password;
    }

    /**
     * Returns host.
     * @return hostname or IP
     */
    public String getHost() {
        return host;
    }

    /**
     * Returns port.
     * @return port
     */
    public int getPort() {
        return port;
    }

    /**
     * Returns timeout.
     * @return socket / connection timeout
     */
    public int getTimeout() {
        return timeout;
    }

    /**
     * Returns database index.
     * @return database index
     */
    public int getDatabase() {
        return database;
    }

    /**
     * Returns password.
     * @return password
     */
    public String getPassword() {
        return password;
    }

    /**
     * Builder for initializing JedisPoolConfig.
     */
    public static class Builder {
        private String host = DEFAULT_HOST;
        private int port = Protocol.DEFAULT_PORT;
        private int timeout = Protocol.DEFAULT_TIMEOUT;
        private int database = Protocol.DEFAULT_DATABASE;
        private String password;

        /**
         * Sets host.
         * @param host host
         * @return Builder itself
         */
        public Builder setHost(String host) {
            this.host = host;
            return this;
        }

        /**
         * Sets port.
         * @param port port
         * @return Builder itself
         */
        public Builder setPort(int port) {
            this.port = port;
            return this;
        }

        /**
         * Sets timeout.
         * @param timeout timeout
         * @return Builder itself
         */
        public Builder setTimeout(int timeout) {
            this.timeout = timeout;
            return this;
        }

        /**
         * Sets database index.
         * @param database database index
         * @return Builder itself
         */
        public Builder setDatabase(int database) {
            this.database = database;
            return this;
        }

        /**
         * Sets password.
         * @param password password, if any
         * @return Builder itself
         */
        public Builder setPassword(String password) {
            this.password = password;
            return this;
        }

        /**
         * Builds JedisPoolConfig.
         * @return JedisPoolConfig
         */
        public JedisPoolConfig build() {
            return new JedisPoolConfig(host, port, timeout, password, database);
        }
    }
}

我终于来了!我只是让它变得简单,我创建了一个非常简单的发布内容的 redis bolt,我监控了 redis 数据库,它正在运行。我的工作螺栓:

 public class RedisBolt implements IRichBolt {

        protected String channel = "Somriures";
        //    protected String configChannel;
        protected OutputCollector collector;
        //    protected Tuple currentTuple;
        //    protected Logger log;
        protected JedisPool pool;
        //    protected ConfigListenerThread configListenerThread;

        public RedisBolt(){}
        public RedisBolt(String channel) {

        //  log = Logger.getLogger(getClass().getName());
        //  setupNonSerializableAttributes();
        }

        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
        this.collector = collector;
        pool=new JedisPool("127.0.0.1");
          GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxTotal(100);
        config.setTestOnBorrow(true);
        }



        public void execute(Tuple tuple) {
        String current = tuple.getString(0);
        if(current != null) {
            //      for(Object obj: result) {
            System.out.println("Publiquem " + current);
            publish(current);
            System.out.println("emitim " + current);
            collector.emit(tuple, new Values(current));
            //      }
            collector.ack(tuple);
        }
        }

        public void cleanup() {
        if(pool != null) {
            pool.destroy();
        }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(channel));
        }

        public void publish(String msg) {
        Jedis jedis = pool.getResource();
        jedis.publish(channel, msg);

        pool.returnResource(jedis);
        }

        protected void setupNonSerializableAttributes() {

        }

        public Map getComponentConfiguration() {
        return null;
        }
    }