如何使用 Redis 和 Flask 连接 Storm 和 D3.js?
How can I connect Storm and D3.js using Redis and Flask?
我完成了 Storm 测试拓扑,在我根据 Html 代码创建 d3 脚本之前,它从文本文件中读取数据。我现在希望它直接从 Storm 拓扑中读取数据(也许是螺栓?)但我不知道该怎么做。我正在使用 Horton Works Sandbox 进行测试,我们将不胜感激。
提前致谢!
我找到了一个redis的风暴包,我现在正在尝试使用。它允许你设置一个用于在redis上写入的螺栓,我已经设置了节点。我现在的问题是 eclipse 找不到 java 代码的导入,pom.xml.I 上的代码已经下载了包。我当前的 java 螺栓和进口是:
package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.Map;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import storm.external.*;// error from here
import storm.external.storm-redis.org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import redis.clients.jedis.JedisCommands;//to here
..........
class MortsStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("count");
}
}
还有我的pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Storm.practice</groupId>
<artifactId>Storm.Prova</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Storm.Prova</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.1-incubating</version>
</dependency>
<dependency> #error from here...
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>{0.9.1-incubating}</version>
<type>jar</type>
</dependency>#... to here
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<configuration>
<mainClass>Storm.practice.Storm.Prova.ProvaTopology</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
错误是Eclipse找不到依赖和包
根据您的情况,我认为您将需要一些系统或中间代码来从 Storm 读取数据并推送到 D3。你可以尝试像 WSO2 CEP [1] 这样的东西,它能够连接到 Storm 并使用 websockets 将事件推送到基于 d3 [2] 的仪表板。
在您的场景中,您可以将 Storm bolt 中的逻辑映射到 Siddhi 查询 [3],然后将这些事件从 Storm 获取到 WSO2 CEP。然后你可以创建一个 websocket 发布者来使用服务器的内置 websocket 功能将事件发送到你的 D3 代码。
请注意,这是基于您的要求的可能解决方案之一,您最好利用已集成到 Storm 和 D3 的现有 CEP 系统的功能。
希望对您有所帮助!
[1] http://wso2.com/products/complex-event-processor/
[2]https://docs.wso2.com/display/CEP400/Visualizing+Results+in+the+Analytics+Dashboard
我现在有点晚了,快一年了,但是我正在审核我的帐户,看到了这个问题。
我终于使用了与 Jedis 接口的 Redis,我将其作为 Maven 工件导入。一旦它开始工作并且我能够通过 telnet 使用 Redis Monitor 查看结果,我创建了一个简单的 Node.js 代码,启动它,数据到达客户端,因此到达 d3。我需要 Socket.io 和 Redis.js 来实现这一点,但现在正在工作。
如果有人需要一些细节,请问我,我会很乐意帮助你。
我完成了 Storm 测试拓扑,在我根据 Html 代码创建 d3 脚本之前,它从文本文件中读取数据。我现在希望它直接从 Storm 拓扑中读取数据(也许是螺栓?)但我不知道该怎么做。我正在使用 Horton Works Sandbox 进行测试,我们将不胜感激。
提前致谢!
我找到了一个redis的风暴包,我现在正在尝试使用。它允许你设置一个用于在redis上写入的螺栓,我已经设置了节点。我现在的问题是 eclipse 找不到 java 代码的导入,pom.xml.I 上的代码已经下载了包。我当前的 java 螺栓和进口是:
package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.Map;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import storm.external.*;// error from here
import storm.external.storm-redis.org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import redis.clients.jedis.JedisCommands;//to here
..........
class MortsStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";
public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getStringByField("count");
}
}
还有我的pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Storm.practice</groupId>
<artifactId>Storm.Prova</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Storm.Prova</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.1-incubating</version>
</dependency>
<dependency> #error from here...
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>{0.9.1-incubating}</version>
<type>jar</type>
</dependency>#... to here
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<configuration>
<mainClass>Storm.practice.Storm.Prova.ProvaTopology</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
错误是Eclipse找不到依赖和包
根据您的情况,我认为您将需要一些系统或中间代码来从 Storm 读取数据并推送到 D3。你可以尝试像 WSO2 CEP [1] 这样的东西,它能够连接到 Storm 并使用 websockets 将事件推送到基于 d3 [2] 的仪表板。
在您的场景中,您可以将 Storm bolt 中的逻辑映射到 Siddhi 查询 [3],然后将这些事件从 Storm 获取到 WSO2 CEP。然后你可以创建一个 websocket 发布者来使用服务器的内置 websocket 功能将事件发送到你的 D3 代码。
请注意,这是基于您的要求的可能解决方案之一,您最好利用已集成到 Storm 和 D3 的现有 CEP 系统的功能。
希望对您有所帮助!
[1] http://wso2.com/products/complex-event-processor/
[2]https://docs.wso2.com/display/CEP400/Visualizing+Results+in+the+Analytics+Dashboard
我现在有点晚了,快一年了,但是我正在审核我的帐户,看到了这个问题。
我终于使用了与 Jedis 接口的 Redis,我将其作为 Maven 工件导入。一旦它开始工作并且我能够通过 telnet 使用 Redis Monitor 查看结果,我创建了一个简单的 Node.js 代码,启动它,数据到达客户端,因此到达 d3。我需要 Socket.io 和 Redis.js 来实现这一点,但现在正在工作。
如果有人需要一些细节,请问我,我会很乐意帮助你。