Storm中的分布式缓存
Distributed caching in storm
如何在Apache Storm中存储临时数据?
在storm拓扑中,bolt需要访问之前处理过的数据。
Eg: if the bolt processes varaiable1 with result as 20 at 10:00 AM.
再次 varaiable1
在 10:15 AM
收到 50
那么结果应该是 30 (50-20)
稍后如果 varaiable1 收到 70
那么结果应该是 20 (70-50)
在 10:30
。
如何实现这个功能。
恐怕到今天为止还没有这样的内置功能。
但是您可以使用任何类型的分布式缓存,例如 memcached 或 Redis。这些缓存解决方案真的很容易使用。
简而言之,您想使用风暴中的 运行 元组进行微批处理计算。
首先,您需要 define/find 键入元组集。
使用该键在螺栓之间进行字段分组(不要使用随机分组)。这将保证相关的元组将始终针对相同的键发送到下游螺栓的相同任务。
定义 class 级别集合 List/Map 以维护旧值并在其中添加新值以进行计算,不用担心它们在同一螺栓的不同执行程序实例之间是线程安全的。
有几种方法可以做到这一点,但这取决于您的系统要求、团队技能和基础设施。
您可以使用 Apache Cassandra 来存储事件,并在元组中传递行的键,以便下一个螺栓可以检索它。
如果您的数据本质上是时间序列,那么也许您想看看 OpenTSDB or InfluxDB。
您当然可以退回到软件事务内存之类的东西,但我认为这需要大量的制作。
你可以使用 CacheBuilder 在你的扩展 BaseRichBolt 中记住你的数据(把它放在 prepare 方法中):
// init your cache.
this.cache = CacheBuilder.newBuilder()
.maximumSize(maximumCacheSize)
.expireAfterWrite(expireAfterWrite, TimeUnit.SECONDS)
.build();
然后在execute中,你可以使用缓存来查看你是否已经看到那个key entry了。从那里您可以添加您的业务逻辑:
// if we haven't seen it before, we can emit it.
if(this.cache.getIfPresent(key) == null) {
cache.put(key, nearlyEmptyList);
this.collector.emit(input, input.getValues());
}
this.collector.ack(input);
这个问题很适合展示 Apache Spark 在微批处理上的内存计算。但是,您的用例在 Storm 中实现起来很简单。
确保螺栓使用字段分组。它会始终将传入的元组哈希到同一个螺栓,因此我们不会丢失任何元组。
在螺栓的本地缓存中维护一个 Map。该地图将保留“变量”的最后一个已知值。
class CumulativeDiffBolt extends InstrumentedBolt{
Map<String, Integer> lastKnownVariableValue;
@Override
public void prepare(){
this.lastKnownVariableValue = new HashMap<>();
....
@Override
public void instrumentedNextTuple(Tuple tuple, Collector collector){
.... extract variable from tuple
.... extract current value from tuple
Integer lastValue = lastKnownVariableValue.getOrDefault(variable, 0)
Integer newValue = currValue - lastValue
lastKnownVariableValue.put(variable, newValue)
emit(new Fields(variable, newValue));
...
}
如何在Apache Storm中存储临时数据?
在storm拓扑中,bolt需要访问之前处理过的数据。
Eg: if the bolt processes varaiable1 with result as 20 at 10:00 AM.
再次 varaiable1
在 10:15 AM
收到 50
那么结果应该是 30 (50-20)
稍后如果 varaiable1 收到 70
那么结果应该是 20 (70-50)
在 10:30
。
如何实现这个功能。
恐怕到今天为止还没有这样的内置功能。 但是您可以使用任何类型的分布式缓存,例如 memcached 或 Redis。这些缓存解决方案真的很容易使用。
简而言之,您想使用风暴中的 运行 元组进行微批处理计算。 首先,您需要 define/find 键入元组集。 使用该键在螺栓之间进行字段分组(不要使用随机分组)。这将保证相关的元组将始终针对相同的键发送到下游螺栓的相同任务。 定义 class 级别集合 List/Map 以维护旧值并在其中添加新值以进行计算,不用担心它们在同一螺栓的不同执行程序实例之间是线程安全的。
有几种方法可以做到这一点,但这取决于您的系统要求、团队技能和基础设施。
您可以使用 Apache Cassandra 来存储事件,并在元组中传递行的键,以便下一个螺栓可以检索它。
如果您的数据本质上是时间序列,那么也许您想看看 OpenTSDB or InfluxDB。
您当然可以退回到软件事务内存之类的东西,但我认为这需要大量的制作。
你可以使用 CacheBuilder 在你的扩展 BaseRichBolt 中记住你的数据(把它放在 prepare 方法中):
// init your cache.
this.cache = CacheBuilder.newBuilder()
.maximumSize(maximumCacheSize)
.expireAfterWrite(expireAfterWrite, TimeUnit.SECONDS)
.build();
然后在execute中,你可以使用缓存来查看你是否已经看到那个key entry了。从那里您可以添加您的业务逻辑:
// if we haven't seen it before, we can emit it.
if(this.cache.getIfPresent(key) == null) {
cache.put(key, nearlyEmptyList);
this.collector.emit(input, input.getValues());
}
this.collector.ack(input);
这个问题很适合展示 Apache Spark 在微批处理上的内存计算。但是,您的用例在 Storm 中实现起来很简单。
确保螺栓使用字段分组。它会始终将传入的元组哈希到同一个螺栓,因此我们不会丢失任何元组。
在螺栓的本地缓存中维护一个 Map
。该地图将保留“变量”的最后一个已知值。
class CumulativeDiffBolt extends InstrumentedBolt{
Map<String, Integer> lastKnownVariableValue;
@Override
public void prepare(){
this.lastKnownVariableValue = new HashMap<>();
....
@Override
public void instrumentedNextTuple(Tuple tuple, Collector collector){
.... extract variable from tuple
.... extract current value from tuple
Integer lastValue = lastKnownVariableValue.getOrDefault(variable, 0)
Integer newValue = currValue - lastValue
lastKnownVariableValue.put(variable, newValue)
emit(new Fields(variable, newValue));
...
}