Storm 拓扑上的 Redis 错误
Redis Error On a Storm topology
我正在使用 Redis 从我的 Storm 拓扑 中提取数据,我一直在阅读它,并找到了一些示例。
现在,我正在尝试运行,稍后我将实现一个从redis读取的前端并进行d3.js实现在将数据放在图表上的 html 文件上。我现在已经为本地文件实现了 D3 部分。目前,我没有创建用于表示我的数据的数学函数,我只是创建了从文本文件中读取名称并附加“:) :)”符号的元组。那部分工作也很好,但我在尝试在 Storm 0.10.0.
上使用 storm-redis 时遇到问题
暂时理解为(如有错误请指正) redis是mongoDB、非SQL等数据库使用字段来检索密钥。我有一个 implemerntation,但它不工作。我在一行中编译出错,它被注释掉了。这是我的代码:
package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.Map;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import redis.clients.jedis.JedisCommands;
/**
* This is a basic example of a Storm topology.
*/
public class ProvaTopology {
public static class ProvaBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + " :-)"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("morts"));
}
}
public class ProvaSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
//Random _rand;
private String fileName;
//private SpoutOutputCollector _collector;
private BufferedReader reader;
private AtomicLong linesRead;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
try {
fileName= (String)"/home/prova.tsv";
reader = new BufferedReader(new FileReader(fileName));
// read and ignore the header if one exists
} catch (Exception e) {
throw new RuntimeException(e);
}
// _rand = new Random();
}
public void nextTuple() {
Utils.sleep(100);
try {
String line = reader.readLine();
if (line != null) {
long id = linesRead.incrementAndGet();
_collector.emit(new Values(line), id);
} else {
System.out.println("Finished reading file, " + linesRead.get() + " lines read");
Thread.sleep(10000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
int _rand;*/
//String sentence = sentences[_rand.nextInt(sentences.length)];
//_collector.emit(new Values(sentence));
public void ack(Object id) {
}
public void fail(Object id) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("morts"));
}
}
class MortsStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "Morts";
public void MortsStoreStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("morts");
}
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("somriures");
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("127.0.0.1").setPort(666).build();
RedisStoreMapper storeMapper = new MortsStoreMapper();**//ERROR HERE** Non enclosing instance of type ProvaTopology is accessible. Must qualify the allocation with an enclosing instance of type ProvaTopology.
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
builder.setSpout("morts", new TestWordSpout(), 10);//emisor
builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("morts");// de on llig?
builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
//WithProgressBar
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
尽管存在此错误,但一旦解决,我不确定我的拓扑是否会保存任何内容,而且一旦保存,我对如何使用前端检索它有点迷茫。任何帮助将不胜感激。
提前致谢
这是我在网上找到的一个 jedis 实现,在 Storm-redis 教程中,以及我正在使用的那个,以防万一:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.redis.common.config;
import redis.clients.jedis.Protocol;
import java.io.Serializable;
/**
* Configuration for JedisPool.
*/
public class JedisPoolConfig implements Serializable {
public static final String DEFAULT_HOST = "127.0.0.1";
private String host;
private int port;
private int timeout;
private int database;
private String password;
/**
* Constructor
* <p/>
* You can use JedisPoolConfig.Builder() for leaving some fields to apply default value.
*
* @param host hostname or IP
* @param port port
* @param timeout socket / connection timeout
* @param database database index
* @param password password, if any
*/
public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
this.host = host;
this.port = port;
this.timeout = timeout;
this.database = database;
this.password = password;
}
/**
* Returns host.
* @return hostname or IP
*/
public String getHost() {
return host;
}
/**
* Returns port.
* @return port
*/
public int getPort() {
return port;
}
/**
* Returns timeout.
* @return socket / connection timeout
*/
public int getTimeout() {
return timeout;
}
/**
* Returns database index.
* @return database index
*/
public int getDatabase() {
return database;
}
/**
* Returns password.
* @return password
*/
public String getPassword() {
return password;
}
/**
* Builder for initializing JedisPoolConfig.
*/
public static class Builder {
private String host = DEFAULT_HOST;
private int port = Protocol.DEFAULT_PORT;
private int timeout = Protocol.DEFAULT_TIMEOUT;
private int database = Protocol.DEFAULT_DATABASE;
private String password;
/**
* Sets host.
* @param host host
* @return Builder itself
*/
public Builder setHost(String host) {
this.host = host;
return this;
}
/**
* Sets port.
* @param port port
* @return Builder itself
*/
public Builder setPort(int port) {
this.port = port;
return this;
}
/**
* Sets timeout.
* @param timeout timeout
* @return Builder itself
*/
public Builder setTimeout(int timeout) {
this.timeout = timeout;
return this;
}
/**
* Sets database index.
* @param database database index
* @return Builder itself
*/
public Builder setDatabase(int database) {
this.database = database;
return this;
}
/**
* Sets password.
* @param password password, if any
* @return Builder itself
*/
public Builder setPassword(String password) {
this.password = password;
return this;
}
/**
* Builds JedisPoolConfig.
* @return JedisPoolConfig
*/
public JedisPoolConfig build() {
return new JedisPoolConfig(host, port, timeout, password, database);
}
}
}
我终于来了!我只是让它变得简单,我创建了一个非常简单的发布内容的 redis bolt,我监控了 redis 数据库,它正在运行。我的工作螺栓:
public class RedisBolt implements IRichBolt {
protected String channel = "Somriures";
// protected String configChannel;
protected OutputCollector collector;
// protected Tuple currentTuple;
// protected Logger log;
protected JedisPool pool;
// protected ConfigListenerThread configListenerThread;
public RedisBolt(){}
public RedisBolt(String channel) {
// log = Logger.getLogger(getClass().getName());
// setupNonSerializableAttributes();
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
pool=new JedisPool("127.0.0.1");
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(100);
config.setTestOnBorrow(true);
}
public void execute(Tuple tuple) {
String current = tuple.getString(0);
if(current != null) {
// for(Object obj: result) {
System.out.println("Publiquem " + current);
publish(current);
System.out.println("emitim " + current);
collector.emit(tuple, new Values(current));
// }
collector.ack(tuple);
}
}
public void cleanup() {
if(pool != null) {
pool.destroy();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(channel));
}
public void publish(String msg) {
Jedis jedis = pool.getResource();
jedis.publish(channel, msg);
pool.returnResource(jedis);
}
protected void setupNonSerializableAttributes() {
}
public Map getComponentConfiguration() {
return null;
}
}
我正在使用 Redis 从我的 Storm 拓扑 中提取数据,我一直在阅读它,并找到了一些示例。
现在,我正在尝试运行,稍后我将实现一个从redis读取的前端并进行d3.js实现在将数据放在图表上的 html 文件上。我现在已经为本地文件实现了 D3 部分。目前,我没有创建用于表示我的数据的数学函数,我只是创建了从文本文件中读取名称并附加“:) :)”符号的元组。那部分工作也很好,但我在尝试在 Storm 0.10.0.
上使用 storm-redis 时遇到问题暂时理解为(如有错误请指正) redis是mongoDB、非SQL等数据库使用字段来检索密钥。我有一个 implemerntation,但它不工作。我在一行中编译出错,它被注释掉了。这是我的代码:
package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.Map;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import redis.clients.jedis.JedisCommands;
/**
* This is a basic example of a Storm topology.
*/
public class ProvaTopology {
public static class ProvaBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + " :-)"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("morts"));
}
}
public class ProvaSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
//Random _rand;
private String fileName;
//private SpoutOutputCollector _collector;
private BufferedReader reader;
private AtomicLong linesRead;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
try {
fileName= (String)"/home/prova.tsv";
reader = new BufferedReader(new FileReader(fileName));
// read and ignore the header if one exists
} catch (Exception e) {
throw new RuntimeException(e);
}
// _rand = new Random();
}
public void nextTuple() {
Utils.sleep(100);
try {
String line = reader.readLine();
if (line != null) {
long id = linesRead.incrementAndGet();
_collector.emit(new Values(line), id);
} else {
System.out.println("Finished reading file, " + linesRead.get() + " lines read");
Thread.sleep(10000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
int _rand;*/
//String sentence = sentences[_rand.nextInt(sentences.length)];
//_collector.emit(new Values(sentence));
public void ack(Object id) {
}
public void fail(Object id) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("morts"));
}
}
class MortsStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "Morts";
public void MortsStoreStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("morts");
}
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("somriures");
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("127.0.0.1").setPort(666).build();
RedisStoreMapper storeMapper = new MortsStoreMapper();**//ERROR HERE** Non enclosing instance of type ProvaTopology is accessible. Must qualify the allocation with an enclosing instance of type ProvaTopology.
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
builder.setSpout("morts", new TestWordSpout(), 10);//emisor
builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("morts");// de on llig?
builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
//WithProgressBar
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
尽管存在此错误,但一旦解决,我不确定我的拓扑是否会保存任何内容,而且一旦保存,我对如何使用前端检索它有点迷茫。任何帮助将不胜感激。
提前致谢
这是我在网上找到的一个 jedis 实现,在 Storm-redis 教程中,以及我正在使用的那个,以防万一:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.redis.common.config;
import redis.clients.jedis.Protocol;
import java.io.Serializable;
/**
* Configuration for JedisPool.
*/
public class JedisPoolConfig implements Serializable {
public static final String DEFAULT_HOST = "127.0.0.1";
private String host;
private int port;
private int timeout;
private int database;
private String password;
/**
* Constructor
* <p/>
* You can use JedisPoolConfig.Builder() for leaving some fields to apply default value.
*
* @param host hostname or IP
* @param port port
* @param timeout socket / connection timeout
* @param database database index
* @param password password, if any
*/
public JedisPoolConfig(String host, int port, int timeout, String password, int database) {
this.host = host;
this.port = port;
this.timeout = timeout;
this.database = database;
this.password = password;
}
/**
* Returns host.
* @return hostname or IP
*/
public String getHost() {
return host;
}
/**
* Returns port.
* @return port
*/
public int getPort() {
return port;
}
/**
* Returns timeout.
* @return socket / connection timeout
*/
public int getTimeout() {
return timeout;
}
/**
* Returns database index.
* @return database index
*/
public int getDatabase() {
return database;
}
/**
* Returns password.
* @return password
*/
public String getPassword() {
return password;
}
/**
* Builder for initializing JedisPoolConfig.
*/
public static class Builder {
private String host = DEFAULT_HOST;
private int port = Protocol.DEFAULT_PORT;
private int timeout = Protocol.DEFAULT_TIMEOUT;
private int database = Protocol.DEFAULT_DATABASE;
private String password;
/**
* Sets host.
* @param host host
* @return Builder itself
*/
public Builder setHost(String host) {
this.host = host;
return this;
}
/**
* Sets port.
* @param port port
* @return Builder itself
*/
public Builder setPort(int port) {
this.port = port;
return this;
}
/**
* Sets timeout.
* @param timeout timeout
* @return Builder itself
*/
public Builder setTimeout(int timeout) {
this.timeout = timeout;
return this;
}
/**
* Sets database index.
* @param database database index
* @return Builder itself
*/
public Builder setDatabase(int database) {
this.database = database;
return this;
}
/**
* Sets password.
* @param password password, if any
* @return Builder itself
*/
public Builder setPassword(String password) {
this.password = password;
return this;
}
/**
* Builds JedisPoolConfig.
* @return JedisPoolConfig
*/
public JedisPoolConfig build() {
return new JedisPoolConfig(host, port, timeout, password, database);
}
}
}
我终于来了!我只是让它变得简单,我创建了一个非常简单的发布内容的 redis bolt,我监控了 redis 数据库,它正在运行。我的工作螺栓:
public class RedisBolt implements IRichBolt {
protected String channel = "Somriures";
// protected String configChannel;
protected OutputCollector collector;
// protected Tuple currentTuple;
// protected Logger log;
protected JedisPool pool;
// protected ConfigListenerThread configListenerThread;
public RedisBolt(){}
public RedisBolt(String channel) {
// log = Logger.getLogger(getClass().getName());
// setupNonSerializableAttributes();
}
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
pool=new JedisPool("127.0.0.1");
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(100);
config.setTestOnBorrow(true);
}
public void execute(Tuple tuple) {
String current = tuple.getString(0);
if(current != null) {
// for(Object obj: result) {
System.out.println("Publiquem " + current);
publish(current);
System.out.println("emitim " + current);
collector.emit(tuple, new Values(current));
// }
collector.ack(tuple);
}
}
public void cleanup() {
if(pool != null) {
pool.destroy();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(channel));
}
public void publish(String msg) {
Jedis jedis = pool.getResource();
jedis.publish(channel, msg);
pool.returnResource(jedis);
}
protected void setupNonSerializableAttributes() {
}
public Map getComponentConfiguration() {
return null;
}
}