使用带有 Redis 的 Storm 作为数据源
Using Storm with Redis as data source
我有一个 Storm 拓扑需要从 Redis 实例流式传输其数据,我尝试 运行 从单个 Redis 实例读取拓扑但似乎没有从 Redis 读取任何内容,当我检查返回的队列是空的。我正在使用 Storm 版本 0.9.3。
这是我的 RedisQueueSpout
,它是一个 Storm spout,它将使用指定的模式(也称为密钥)将您的拓扑插入 Redis,每次 Storm 轮询时它都会查找输入数据。 spout 将带有 ID 消息的单个字段发送到它后面的任何一个 bolt。
package storm.starter.spout;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RedisQueueSpout extends BaseRichSpout {
static final long serialVersionUID = 737015318988609460L;
private SpoutOutputCollector _collector;
private final String host;
private final int port;
private final String pattern;
private transient JedisQueue jq;
public RedisQueueSpout(String host, int port, String pattern) {
this.host = host;
this.port = port;
this.pattern = pattern;
}
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
Jedis newJedis = new Jedis(host, port);
newJedis.connect();
this.jq = new JedisQueue(newJedis, pattern);
}
public void close() {}
public void nextTuple() {
List<String> ret = this.jq.dequeue();
if (ret == null) {
Utils.sleep(5L);
}
else {
System.out.println(ret);
_collector.emit(new Values(ret));
}
}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("name"));
}
}
这是我的 JedisQueue
,它是 Redis 支持的标准队列数据结构的实现。请注意,dequeue 方法有点不合常规 returns a List<String>
因为这是 Jedis 的底层实现 returns:这是由于 Redis 能够为单个键存储许多值。
package storm.starter;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;
public class JedisQueue {
private transient Jedis jedis;
private final String pattern;
public JedisQueue(Jedis jedis, String pattern) {
this.jedis = jedis;
this.pattern = pattern;
}
public void clear() {
this.jedis.del(this.pattern);
}
public boolean isEmpty() {
return (this.size() == 0);
}
public int size() {
return new Integer(this.jedis.llen(this.pattern).toString());
}
public List<String> toArray() {
return this.jedis.lrange(this.pattern, 0, -1);
}
public void enqueue(String... elems) {
this.jedis.rpush(this.pattern, elems);
}
public List<String> dequeue() {
List<String> out = null;
try {
out = this.jedis.blpop(0, this.pattern);
}
catch (JedisDataException e) {
// It wasn't a list of strings
}
return out;
}
}
代码取自 Storm-jedis,有关更多信息,您可以查看 link。
这是我的拓扑结构:
package storm.starter;
import org.tomdz.storm.esper.EsperBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.starter.spout.RedisQueueSpout;;
public class NameCountTopology {
public static void main (String[] args) throws Exception {
String host = "10.0.0.251";
int port = 6379;
String pattern = "Name:*";
TopologyBuilder builder = new TopologyBuilder();
EsperBolt bolt = new EsperBolt.Builder().inputs().aliasComponent("spout").toEventType("names").outputs()
.onDefaultStream().emit("nps").statements()
.add("select count(*) as nps from names.win:time_batch(1 sec)").build();
builder.setSpout("spout", new RedisQueueSpout(host,port,pattern),1);
builder.setBolt("count-bolt", bolt, 1).fieldsGrouping("spout", new Fields("name"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("name-count-topology", conf, builder.createTopology());
Utils.sleep(300000);
cluster.killTopology("name-count-topology");
cluster.shutdown();
}
}
}
我的 Redis 键值是使用 HMSET 按以下格式存储的:
HMSET Name:1 NAME Mary YEAR 1880 GENDER F COUNT 7065
HMSET Name:2 NAME Anna YEAR 1880 GENDER F COUNT 2604
...
这是来自我的主管节点的日志:
2016-05-04 07:37:56 b.s.d.executor [INFO] Opened spout spout:(3)
2016-05-04 07:37:56 b.s.d.executor [INFO] Activating spout spout:(3)
2016-05-04 07:37:56 STDIO [INFO] Queue is empty...
2016-05-04 07:37:56 c.e.e.c.EPServiceProviderImpl [INFO] Initializing engine URI 'org.tomdz.storm.esper.EsperBolt@44d83ea0' version 4.3.0
2016-05-04 07:37:58 b.s.d.executor [INFO] Prepared bolt count-bolt:(2)
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@70f9b3ee> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=9418640, usedBytes=14710896, maxBytes=259522560, initBytes=8035520, virtualFreeBytes=244811664, committedBytes=24129536}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=1218808, usedBytes=36529928, maxBytes=224395264, initBytes=24313856, virtualFreeBytes=187865336, committedBytes=37748736}]> #<DataPoint [uptimeSecs = 77.358]> #<DataPoint [__transfer = {write_pos=0, read_pos=0, capacity=1024, population=0}]> #<DataPoint [startTimeSecs = 1.462347457159E9]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@19940834> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
并且日志一直这样重复。
这是我 UI 在 运行 拓扑之后:
storm UI
现在我的问题是为什么 spout 不工作并且没有发出任何东西,似乎没有从 Redis 中获取任何东西。
PS: 我已经检查了主机和端口,我可以从 Redis 获取数据,所以我认为与 Redis 的连接没有问题。
- HMSET 用于散列,BLPOP 用于列表。它们不兼容。
- BLPOP 不需要模式。它需要确切的键名。详情请参考http://redis.io/commands/blpop。
- 由于 Spout 从单线程执行 nextTuple()、ack()、fail() 方法,因此具有长(或无限)超时的 BLPOP 也会阻塞 Spout,除非有一条消息可用于弹出。
希望对您有所帮助。
我有一个 Storm 拓扑需要从 Redis 实例流式传输其数据,我尝试 运行 从单个 Redis 实例读取拓扑但似乎没有从 Redis 读取任何内容,当我检查返回的队列是空的。我正在使用 Storm 版本 0.9.3。
这是我的 RedisQueueSpout
,它是一个 Storm spout,它将使用指定的模式(也称为密钥)将您的拓扑插入 Redis,每次 Storm 轮询时它都会查找输入数据。 spout 将带有 ID 消息的单个字段发送到它后面的任何一个 bolt。
package storm.starter.spout;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RedisQueueSpout extends BaseRichSpout {
static final long serialVersionUID = 737015318988609460L;
private SpoutOutputCollector _collector;
private final String host;
private final int port;
private final String pattern;
private transient JedisQueue jq;
public RedisQueueSpout(String host, int port, String pattern) {
this.host = host;
this.port = port;
this.pattern = pattern;
}
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
Jedis newJedis = new Jedis(host, port);
newJedis.connect();
this.jq = new JedisQueue(newJedis, pattern);
}
public void close() {}
public void nextTuple() {
List<String> ret = this.jq.dequeue();
if (ret == null) {
Utils.sleep(5L);
}
else {
System.out.println(ret);
_collector.emit(new Values(ret));
}
}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("name"));
}
}
这是我的 JedisQueue
,它是 Redis 支持的标准队列数据结构的实现。请注意,dequeue 方法有点不合常规 returns a List<String>
因为这是 Jedis 的底层实现 returns:这是由于 Redis 能够为单个键存储许多值。
package storm.starter;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;
public class JedisQueue {
private transient Jedis jedis;
private final String pattern;
public JedisQueue(Jedis jedis, String pattern) {
this.jedis = jedis;
this.pattern = pattern;
}
public void clear() {
this.jedis.del(this.pattern);
}
public boolean isEmpty() {
return (this.size() == 0);
}
public int size() {
return new Integer(this.jedis.llen(this.pattern).toString());
}
public List<String> toArray() {
return this.jedis.lrange(this.pattern, 0, -1);
}
public void enqueue(String... elems) {
this.jedis.rpush(this.pattern, elems);
}
public List<String> dequeue() {
List<String> out = null;
try {
out = this.jedis.blpop(0, this.pattern);
}
catch (JedisDataException e) {
// It wasn't a list of strings
}
return out;
}
}
代码取自 Storm-jedis,有关更多信息,您可以查看 link。
这是我的拓扑结构:
package storm.starter;
import org.tomdz.storm.esper.EsperBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.starter.spout.RedisQueueSpout;;
public class NameCountTopology {
public static void main (String[] args) throws Exception {
String host = "10.0.0.251";
int port = 6379;
String pattern = "Name:*";
TopologyBuilder builder = new TopologyBuilder();
EsperBolt bolt = new EsperBolt.Builder().inputs().aliasComponent("spout").toEventType("names").outputs()
.onDefaultStream().emit("nps").statements()
.add("select count(*) as nps from names.win:time_batch(1 sec)").build();
builder.setSpout("spout", new RedisQueueSpout(host,port,pattern),1);
builder.setBolt("count-bolt", bolt, 1).fieldsGrouping("spout", new Fields("name"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("name-count-topology", conf, builder.createTopology());
Utils.sleep(300000);
cluster.killTopology("name-count-topology");
cluster.shutdown();
}
}
}
我的 Redis 键值是使用 HMSET 按以下格式存储的:
HMSET Name:1 NAME Mary YEAR 1880 GENDER F COUNT 7065
HMSET Name:2 NAME Anna YEAR 1880 GENDER F COUNT 2604
...
这是来自我的主管节点的日志:
2016-05-04 07:37:56 b.s.d.executor [INFO] Opened spout spout:(3)
2016-05-04 07:37:56 b.s.d.executor [INFO] Activating spout spout:(3)
2016-05-04 07:37:56 STDIO [INFO] Queue is empty...
2016-05-04 07:37:56 c.e.e.c.EPServiceProviderImpl [INFO] Initializing engine URI 'org.tomdz.storm.esper.EsperBolt@44d83ea0' version 4.3.0
2016-05-04 07:37:58 b.s.d.executor [INFO] Prepared bolt count-bolt:(2)
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@70f9b3ee> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=9418640, usedBytes=14710896, maxBytes=259522560, initBytes=8035520, virtualFreeBytes=244811664, committedBytes=24129536}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=1218808, usedBytes=36529928, maxBytes=224395264, initBytes=24313856, virtualFreeBytes=187865336, committedBytes=37748736}]> #<DataPoint [uptimeSecs = 77.358]> #<DataPoint [__transfer = {write_pos=0, read_pos=0, capacity=1024, population=0}]> #<DataPoint [startTimeSecs = 1.462347457159E9]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@19940834> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
并且日志一直这样重复。 这是我 UI 在 运行 拓扑之后: storm UI
现在我的问题是为什么 spout 不工作并且没有发出任何东西,似乎没有从 Redis 中获取任何东西。
PS: 我已经检查了主机和端口,我可以从 Redis 获取数据,所以我认为与 Redis 的连接没有问题。
- HMSET 用于散列,BLPOP 用于列表。它们不兼容。
- BLPOP 不需要模式。它需要确切的键名。详情请参考http://redis.io/commands/blpop。
- 由于 Spout 从单线程执行 nextTuple()、ack()、fail() 方法,因此具有长(或无限)超时的 BLPOP 也会阻塞 Spout,除非有一条消息可用于弹出。
希望对您有所帮助。