在 Spring Boot App 中使用 Redis Stream 通过 HTTP 长轮询阻止 HTTP 响应
Using Redis Stream to Block HTTP response via HTTP long polling in Spring Boot App
我有一个 spring 引导 Web 应用程序,具有更新名为 StudioLinking
的实体的功能。该实体描述了两个 IoT 设备之间的临时、可变、描述性逻辑 link,我的 Web 应用程序是它们的云服务。这些设备之间的链接本质上是短暂的,但 StudioLinking
实体会保留在数据库中以用于报告目的。 StudioLinking
使用 Spring Data/Hibernate 以常规方式存储到基于 SQL 的数据存储中。这个 StudioLinking 实体会不时地使用来自 Rest API 的新信息进行更新。当 link 更新时,设备需要响应(更改颜色、音量等)。现在,这是通过每 5 秒轮询一次来处理的,但这会造成人类用户将更新输入系统和物联网设备实际更新之间的延迟。它可能短至 1 毫秒,也可能长达 5 秒!显然增加轮询频率是不可持续的,而且绝大多数时间根本没有更新!
因此,我正在尝试使用 HTTP 长轮询在同一个应用程序上开发另一个 Rest API,当给定的 StudioLinking 实体更新或超时后,它将 return。听众不支持 WebSocket 或类似的东西,让我只能使用长轮询。长轮询会留下竞争条件,您必须考虑这样一种可能性,即对于连续的消息,一条消息可能会在 HTTP 请求之间“丢失”(当连接正在关闭和打开时,可能会出现新的“更新”如果我使用 Pub/Sub).
则不会被“注意到”
重要的是要注意,这个“订阅更新”API 应该只 return StudioLinking
的最新和当前版本,但应该只在有是实际更新,或者自上次签入后是否发生了更新。 “订阅更新”客户端最初会 POST 一个 API 请求来设置一个新的监听会话并传递它,以便服务器知道他们是谁。因为多个设备可能需要监视对同一 StudioLinking
实体的更新。我相信我可以通过在 redis XREAD 中使用单独命名的消费者来完成此操作。 (在后面的问题中记住这一点)
经过数小时的研究,我相信实现这一目标的方法是使用 redis 流。
我在 Spring Data Redis 中找到了这两个 link 关于 Redis 流的信息:
https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/
https://medium.com/@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281
我也读过关于长轮询的 link,这两个 links 在长轮询期间都有一个睡眠定时器,用于演示目的,但显然我想做一些有用的事情.
https://www.baeldung.com/spring-deferred-result
这两个 link 都非常有帮助。现在我很清楚如何将更新发布到 Redis Stream -(这是未经测试的“伪代码”,但我预计在实施时不会有任何问题)
// In my StudioLinking Entity
@PostUpdate
public void postToRedis() {
StudioLinking link = this;
ObjectRecord<String, StudioLinking> record = StreamRecords.newRecord()
.ofObject(link)
.withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
this.redisTemplate
.opsForStream()
.add(record)
.subscribe(System.out::println);
atomicInteger.incrementAndGet();
}
但是当谈到订阅所述流时,我感到平淡无奇:所以基本上我想在这里做的 - 请原谅被屠杀的伪代码,它仅用于创意目的。我很清楚代码绝不代表语言和框架的实际行为:)
// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
LOG.info("Received async-deferredresult request");
DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);
deferredResult.onTimeout(() ->
deferredResult.setErrorResult(
ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
.body("IT WAS NOT UPDATED!")));
ForkJoinPool.commonPool().submit(() -> {
//----------------------------------------------
// Made up stuff... here is where I want to subscribe to a stream and block!
//----------------------------------------------
LOG.info("Processing in separate thread");
try {
// Subscribe to Redis Stream, get any updates that happened between long-polls
// then block until/if a new message comes over the stream
var subscription = listenerContainer.receiveAutoAck(
Consumer.from(studioLinkingID, updateList),
StreamOffset.create(studioLinkingID, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
} catch (InterruptedException e) {
}
output.setResult("IT WAS UPDATED!");
});
LOG.info("servlet thread freed");
return output;
}
那么对于我将如何处理这个问题有一个很好的解释吗?我认为答案在 https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html 中,但我不是一个足够大的 Spring 高级用户,无法真正理解 Java 文档中的术语(Spring 文档非常好,但是Java文档是用密集的技术语言编写的,我很欣赏这种语言,但还不太理解)。
我的实施还有两个障碍:
- 我对 spring 的准确理解还没有达到 100%。我还没有达到真正完全理解为什么所有这些豆子都在四处漂浮的那一刻。我认为这是为什么我在这里没有得到东西的关键......Redis 的配置在 Spring 以太中浮动,我不知道如何调用它。我真的需要继续调查这个(对我来说这是 spring 的一个巨大障碍)。
- 这些
StudioLinking
是短暂的,所以我也需要做一些清理工作。一旦我把整个事情都搞定了,我会在稍后实施这个,我知道这将是需要的。
为什么不使用阻塞轮询机制?无需使用 spring-data-redis 的花哨东西。只需使用 5 秒的简单阻塞读取,因此此调用可能需要大约 6 秒左右。您可以减少或增加阻塞超时。
class LinkStatus {
private final boolean updated;
LinkStatus(boolean updated) {
this.updated = updated;
}
}
// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public LinkStatus subscribeToUpdates(
@PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
StreamOperations<String, String, String> op = redisTemplate.opsForStream();
Consumer consumer = Consumer.from("test-group", "test-consumer");
// auto ack block stream read with size 1 with timeout of 5 seconds
StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
List<MapRecord<String, String, String>> records =
op.read(consumer, readOptions, StreamOffset.latest("test-stream"));
return new LinkStatus(!CollectionUtils.isEmpty(records));
}
我有一个 spring 引导 Web 应用程序,具有更新名为 StudioLinking
的实体的功能。该实体描述了两个 IoT 设备之间的临时、可变、描述性逻辑 link,我的 Web 应用程序是它们的云服务。这些设备之间的链接本质上是短暂的,但 StudioLinking
实体会保留在数据库中以用于报告目的。 StudioLinking
使用 Spring Data/Hibernate 以常规方式存储到基于 SQL 的数据存储中。这个 StudioLinking 实体会不时地使用来自 Rest API 的新信息进行更新。当 link 更新时,设备需要响应(更改颜色、音量等)。现在,这是通过每 5 秒轮询一次来处理的,但这会造成人类用户将更新输入系统和物联网设备实际更新之间的延迟。它可能短至 1 毫秒,也可能长达 5 秒!显然增加轮询频率是不可持续的,而且绝大多数时间根本没有更新!
因此,我正在尝试使用 HTTP 长轮询在同一个应用程序上开发另一个 Rest API,当给定的 StudioLinking 实体更新或超时后,它将 return。听众不支持 WebSocket 或类似的东西,让我只能使用长轮询。长轮询会留下竞争条件,您必须考虑这样一种可能性,即对于连续的消息,一条消息可能会在 HTTP 请求之间“丢失”(当连接正在关闭和打开时,可能会出现新的“更新”如果我使用 Pub/Sub).
则不会被“注意到”重要的是要注意,这个“订阅更新”API 应该只 return StudioLinking
的最新和当前版本,但应该只在有是实际更新,或者自上次签入后是否发生了更新。 “订阅更新”客户端最初会 POST 一个 API 请求来设置一个新的监听会话并传递它,以便服务器知道他们是谁。因为多个设备可能需要监视对同一 StudioLinking
实体的更新。我相信我可以通过在 redis XREAD 中使用单独命名的消费者来完成此操作。 (在后面的问题中记住这一点)
经过数小时的研究,我相信实现这一目标的方法是使用 redis 流。
我在 Spring Data Redis 中找到了这两个 link 关于 Redis 流的信息:
https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/ https://medium.com/@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281
我也读过关于长轮询的 link,这两个 links 在长轮询期间都有一个睡眠定时器,用于演示目的,但显然我想做一些有用的事情.
https://www.baeldung.com/spring-deferred-result
这两个 link 都非常有帮助。现在我很清楚如何将更新发布到 Redis Stream -(这是未经测试的“伪代码”,但我预计在实施时不会有任何问题)
// In my StudioLinking Entity
@PostUpdate
public void postToRedis() {
StudioLinking link = this;
ObjectRecord<String, StudioLinking> record = StreamRecords.newRecord()
.ofObject(link)
.withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
this.redisTemplate
.opsForStream()
.add(record)
.subscribe(System.out::println);
atomicInteger.incrementAndGet();
}
但是当谈到订阅所述流时,我感到平淡无奇:所以基本上我想在这里做的 - 请原谅被屠杀的伪代码,它仅用于创意目的。我很清楚代码绝不代表语言和框架的实际行为:)
// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
LOG.info("Received async-deferredresult request");
DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);
deferredResult.onTimeout(() ->
deferredResult.setErrorResult(
ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
.body("IT WAS NOT UPDATED!")));
ForkJoinPool.commonPool().submit(() -> {
//----------------------------------------------
// Made up stuff... here is where I want to subscribe to a stream and block!
//----------------------------------------------
LOG.info("Processing in separate thread");
try {
// Subscribe to Redis Stream, get any updates that happened between long-polls
// then block until/if a new message comes over the stream
var subscription = listenerContainer.receiveAutoAck(
Consumer.from(studioLinkingID, updateList),
StreamOffset.create(studioLinkingID, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
} catch (InterruptedException e) {
}
output.setResult("IT WAS UPDATED!");
});
LOG.info("servlet thread freed");
return output;
}
那么对于我将如何处理这个问题有一个很好的解释吗?我认为答案在 https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html 中,但我不是一个足够大的 Spring 高级用户,无法真正理解 Java 文档中的术语(Spring 文档非常好,但是Java文档是用密集的技术语言编写的,我很欣赏这种语言,但还不太理解)。
我的实施还有两个障碍:
- 我对 spring 的准确理解还没有达到 100%。我还没有达到真正完全理解为什么所有这些豆子都在四处漂浮的那一刻。我认为这是为什么我在这里没有得到东西的关键......Redis 的配置在 Spring 以太中浮动,我不知道如何调用它。我真的需要继续调查这个(对我来说这是 spring 的一个巨大障碍)。
- 这些
StudioLinking
是短暂的,所以我也需要做一些清理工作。一旦我把整个事情都搞定了,我会在稍后实施这个,我知道这将是需要的。
为什么不使用阻塞轮询机制?无需使用 spring-data-redis 的花哨东西。只需使用 5 秒的简单阻塞读取,因此此调用可能需要大约 6 秒左右。您可以减少或增加阻塞超时。
class LinkStatus {
private final boolean updated;
LinkStatus(boolean updated) {
this.updated = updated;
}
}
// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public LinkStatus subscribeToUpdates(
@PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
StreamOperations<String, String, String> op = redisTemplate.opsForStream();
Consumer consumer = Consumer.from("test-group", "test-consumer");
// auto ack block stream read with size 1 with timeout of 5 seconds
StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
List<MapRecord<String, String, String>> records =
op.read(consumer, readOptions, StreamOffset.latest("test-stream"));
return new LinkStatus(!CollectionUtils.isEmpty(records));
}