从 Redis 读取数据到 Flink
Read data from Redis to Flink
一直在寻找一个连接器来从Redis读取数据到Flink。 Flink 的文档包含对写入 Redis 的连接器的描述。我需要在我的 Flink 作业中从 Redis 读取数据。在Using Apache Flink for data streaming中,Fabian提到可以从Redis中读取数据。可以使用什么连接器?
有一些关于为 Apache Flink 提供流式 redis 源连接器的讨论(请参阅 FLINK-3033),但没有可用的。然而,实现一个应该不难。
目前Flink Redis Connector不可用,但可以通过扩展RichSinkFunction/SinkFunction class.
来实现
public class RedisSink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
//open redis connection
}
@Override
public void invoke(String map) throws Exception {
//sink data to redis
}
@Override
public void close() throws Exception {
super.close();
}
}
我们运行正在生产中,大致如下所示
class RedisSource extends RichSourceFunction[SomeDataType] {
var client: RedisClient = _
override def open(parameters: Configuration) = {
client = RedisClient() // init connection etc
}
@volatile var isRunning = true
override def cancel(): Unit = {
isRunning = false
client.close()
}
override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
for {
data <- ??? // get some data from the redis client
} yield ctx.collect(SomeDataType(data))
}
}
我认为这真的取决于你需要从 redis 中获取什么。以上可用于从 list/queue、transform/push 中获取消息,然后将其从队列中删除。
Redis 还支持 Pub/Sub,因此可以订阅、获取 SourceConext 并向下游推送消息。
让您的 Flink 程序使用 Jedis 与 Redis 对话的挑战之一是将适当的库放入您提交给 Flink 的 JAR 文件中。如果没有这个,您将获得指示某些 类 未定义的调用堆栈。这是我创建的用于将 Redis 及其依赖组件 apache commons-pool2 移动到我的 JAR 中的 Maven pom.xml 的片段。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<!-- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/commons/**</includes>
</artifactItem>
<artifactItem>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>redis/clients/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
一直在寻找一个连接器来从Redis读取数据到Flink。 Flink 的文档包含对写入 Redis 的连接器的描述。我需要在我的 Flink 作业中从 Redis 读取数据。在Using Apache Flink for data streaming中,Fabian提到可以从Redis中读取数据。可以使用什么连接器?
有一些关于为 Apache Flink 提供流式 redis 源连接器的讨论(请参阅 FLINK-3033),但没有可用的。然而,实现一个应该不难。
目前Flink Redis Connector不可用,但可以通过扩展RichSinkFunction/SinkFunction class.
来实现public class RedisSink extends RichSinkFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
//open redis connection
}
@Override
public void invoke(String map) throws Exception {
//sink data to redis
}
@Override
public void close() throws Exception {
super.close();
}
}
我们运行正在生产中,大致如下所示
class RedisSource extends RichSourceFunction[SomeDataType] {
var client: RedisClient = _
override def open(parameters: Configuration) = {
client = RedisClient() // init connection etc
}
@volatile var isRunning = true
override def cancel(): Unit = {
isRunning = false
client.close()
}
override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
for {
data <- ??? // get some data from the redis client
} yield ctx.collect(SomeDataType(data))
}
}
我认为这真的取决于你需要从 redis 中获取什么。以上可用于从 list/queue、transform/push 中获取消息,然后将其从队列中删除。 Redis 还支持 Pub/Sub,因此可以订阅、获取 SourceConext 并向下游推送消息。
让您的 Flink 程序使用 Jedis 与 Redis 对话的挑战之一是将适当的库放入您提交给 Flink 的 JAR 文件中。如果没有这个,您将获得指示某些 类 未定义的调用堆栈。这是我创建的用于将 Redis 及其依赖组件 apache commons-pool2 移动到我的 JAR 中的 Maven pom.xml 的片段。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>unpack</id>
<!-- executed just before the package phase -->
<!-- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html -->
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>org/apache/commons/**</includes>
</artifactItem>
<artifactItem>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>redis/clients/**</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>