在 kafka 流中使用 Redis 池是否安全?
Is it safe to use Redis pool inside kafka streams?
所以,问题是关于在过滤器、映射等流函数中使用外部状态存储的安全性
这样可以吗:
JedisPool pool = ...;
KStream stream = ...;
stream.map((k, v) -> {
JedisClient client = pool.getResource();
....
client.close();
});
...
KafkaStreams streams = ...;
是否会因为在多个流式任务中使用单个池而导致错误?
在 apache flink 中,我可以使用 Rich*Function<>
,在 open
方法中,我只能将连接池配置到任何存储一次。在 apache spark 中,我还可以配置全局连接。我是否需要使用 kafka 流来做同样的事情?
不建议将 Redis 用于 Spring Cloud Stream 的生产 - 活页夹功能不全,可能会丢失消息。
等同于 Rich*Function
的方法是使用 transform()
而不是 map()
,这样您就可以 init()
和 close()
一个 Transformer
.
即使您可能希望 try-catch
确保执行 close()
,您的方法也应该有效。但是,这不是推荐的模式。
根据您的用例,将数据从 Redis 加载到 Kafka 主题(不确定是否有 Redis 连接器)并将数据加载到 KTable 可能更好。而不是 map()
或 transform()
你会做一个 stream-table join.
所以,问题是关于在过滤器、映射等流函数中使用外部状态存储的安全性
这样可以吗:
JedisPool pool = ...;
KStream stream = ...;
stream.map((k, v) -> {
JedisClient client = pool.getResource();
....
client.close();
});
...
KafkaStreams streams = ...;
是否会因为在多个流式任务中使用单个池而导致错误?
在 apache flink 中,我可以使用 Rich*Function<>
,在 open
方法中,我只能将连接池配置到任何存储一次。在 apache spark 中,我还可以配置全局连接。我是否需要使用 kafka 流来做同样的事情?
不建议将 Redis 用于 Spring Cloud Stream 的生产 - 活页夹功能不全,可能会丢失消息。
等同于 Rich*Function
的方法是使用 transform()
而不是 map()
,这样您就可以 init()
和 close()
一个 Transformer
.
即使您可能希望 try-catch
确保执行 close()
,您的方法也应该有效。但是,这不是推荐的模式。
根据您的用例,将数据从 Redis 加载到 Kafka 主题(不确定是否有 Redis 连接器)并将数据加载到 KTable 可能更好。而不是 map()
或 transform()
你会做一个 stream-table join.