使用 Redis 的 SSE 端点的奇怪行为
Strange behavior with SSE endpoint using Redis
如果客户端在 Redis 中,我需要将一些数据推送到客户端,但客户端每 5 秒就会重新连接到 SSE 端点。
后端代码:
@RestController
@RequestMapping("/reactive-task")
public class TaskRedisController {
private final TaskRedisRepository taskRedisRepository;
TaskRedisController(TaskRedisRepository taskRedisRepository){
this.taskRedisRepository = taskRedisRepository;
}
@CrossOrigin
@GetMapping(value = "/get/{id}")
public Flux<ServerSentEvent<Task>> getSseStream2(@PathVariable("id") String id) {
return taskRedisRepository.findByTaskId(id)
.map(task -> ServerSentEvent.<Task>builder().data(task).build());
}
}
@Repository
public class TaskRedisRepository {
public Flux<Task> findByTaskId(String id) {
return template.keys("task:" + id).flatMap(template.opsForValue()::get);
}
}
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@EqualsAndHashCode
@Entity
public class Task {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
private Long id;
@Column(length = 25)
private String result;
}
客户端使用JS消费:
var evt = new EventSource("http://localhost:8080/reactive-task/get/98"); evt.onmessage = function(event) {
console.log(event);
};
谁能给我指出正确的方向?
如有任何建议,我们将不胜感激。
更新:我需要在 Redis 中存储一段时间(5-10 分钟)的数据。
更新:我在 MongoDB 上写了类似的代码,它工作正常。
在这种情况下,taskRedisRepository.findByTaskId(id)
可能正在发送一个有限的 Flux
- 意味着几个元素和一个完成流的完整信号。
Spring WebFlux 会将 onComplete
信号解释为流的结尾并将其关闭。浏览器 SSE 客户端的默认行为是在连接终止的情况下立即重新连接到终结点。
如果您希望保持持久连接并且仅在添加新元素时收到通知,您需要直接将其作为数据存储区的一项功能加以利用。对于 Redis,此模式由 pub/sub 功能 (see reference documentation) 支持。
总而言之,我认为您看到的是预期的行为,因为在这种情况下,您的数据存储不会生成无限流来通知您添加到 collection 的新元素,而是生成有限的流在给定时间 collection 中存在的元素。
如果客户端在 Redis 中,我需要将一些数据推送到客户端,但客户端每 5 秒就会重新连接到 SSE 端点。
后端代码:
@RestController
@RequestMapping("/reactive-task")
public class TaskRedisController {
private final TaskRedisRepository taskRedisRepository;
TaskRedisController(TaskRedisRepository taskRedisRepository){
this.taskRedisRepository = taskRedisRepository;
}
@CrossOrigin
@GetMapping(value = "/get/{id}")
public Flux<ServerSentEvent<Task>> getSseStream2(@PathVariable("id") String id) {
return taskRedisRepository.findByTaskId(id)
.map(task -> ServerSentEvent.<Task>builder().data(task).build());
}
}
@Repository
public class TaskRedisRepository {
public Flux<Task> findByTaskId(String id) {
return template.keys("task:" + id).flatMap(template.opsForValue()::get);
}
}
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@EqualsAndHashCode
@Entity
public class Task {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
private Long id;
@Column(length = 25)
private String result;
}
客户端使用JS消费:
var evt = new EventSource("http://localhost:8080/reactive-task/get/98"); evt.onmessage = function(event) {
console.log(event);
};
谁能给我指出正确的方向?
如有任何建议,我们将不胜感激。
更新:我需要在 Redis 中存储一段时间(5-10 分钟)的数据。 更新:我在 MongoDB 上写了类似的代码,它工作正常。
在这种情况下,taskRedisRepository.findByTaskId(id)
可能正在发送一个有限的 Flux
- 意味着几个元素和一个完成流的完整信号。
Spring WebFlux 会将 onComplete
信号解释为流的结尾并将其关闭。浏览器 SSE 客户端的默认行为是在连接终止的情况下立即重新连接到终结点。
如果您希望保持持久连接并且仅在添加新元素时收到通知,您需要直接将其作为数据存储区的一项功能加以利用。对于 Redis,此模式由 pub/sub 功能 (see reference documentation) 支持。
总而言之,我认为您看到的是预期的行为,因为在这种情况下,您的数据存储不会生成无限流来通知您添加到 collection 的新元素,而是生成有限的流在给定时间 collection 中存在的元素。