应该 Spring SseEmitter.complete() 触发 EventSource 重新连接 - 如何关闭连接服务器端
Should Spring SseEmitter.complete() trigger an EventSource reconnect - how to close connection server-side
我正在尝试设置一个 Spring SseEmitter 来发送一系列 运行 作业的状态更新。它似乎在工作,但是:
每当我在 Java 服务器代码中调用 emitter.complete()
时,javascript EventSource
客户端调用已注册的 onerror
函数,然后调用我的 Java端点再次使用新连接。这在 Firefox 和 Chrome.
中都会发生
我或许可以从 Java 发送明确的 "end-of-data" 消息,然后检测到它并在客户端上调用 eventSource.close()
,但是有更好的方法吗?
在那种情况下 emitter.complete()
的目的是什么?
此外,如果我总是不得不在客户端终止连接,那么我猜服务器端的每个连接都会因超时或写入错误而终止,在这种情况下我可能想手动发送每隔几秒返回某种心跳?
如果我不得不做这一切,感觉就像我错过了什么。
我已将以下内容添加到我的 Spring 启动应用程序以触发 SSE 连接 close()
服务器端:
- 创建一个简单的控制器,returns SseEmitter。
- 将后端逻辑包装在单线程执行程序服务中。
- 将您的事件发送到 SseEmitter。
完成时通过 SseEmitter 发送一个完成类型的事件。
@RestController
public class SearchController {
@Autowired
private SearchDelegate searchDelegate;
@GetMapping(value = "/{customerId}/search")
@ResponseStatus(HttpStatus.OK)
@ApiOperation(value = "Search Sources", notes = "Search Sources")
@ApiResponses(value = {
@ApiResponse(code = 201, message = "OK"),
@ApiResponse(code = 401, message = "Unauthorized")
})
@ResponseBody
public SseEmitter search(@ApiParam(name = "searchCriteria", value = "searchCriteria", required = true) @ModelAttribute @Valid final SearchCriteriaDto searchCriteriaDto) throws Exception {
return searchDelegate.route(searchCriteriaDto);
}
}
@Service
public class SearchDelegate {
public static final String SEARCH_EVENT_NAME = "SEARCH";
public static final String COMPLETE_EVENT_NAME = "COMPLETE";
public static final String COMPLETE_EVENT_DATA = "{\"name\": \"COMPLETED_STREAM\"}";
@Autowired
private SearchService searchService;
private ExecutorService executor = Executors.newCachedThreadPool();
public SseEmitter route(SearchCriteriaDto searchCriteriaDto) throws Exception {
SseEmitter emitter = new SseEmitter();
executor.execute(() -> {
try {
if(!searchCriteriaDto.getCustomerSources().isEmpty()) {
searchCriteriaDto.getCustomerSources().forEach(customerSource -> {
try {
SearchResponse searchResponse = searchService.search(searchCriteriaDto);
emitter.send(SseEmitter.event()
.id(customerSource.getSourceId())
.name(SEARCH_EVENT_NAME)
.data(searchResponse));
} catch (Exception e) {
log.error("Error while executing query for customer {} with source {}, Caused by {}",
customerId, source.getType(), e.getMessage());
}
});
}else {
log.debug("No available customerSources for the specified customer");
}
emitter.send(SseEmitter.event().
id(String.valueOf(System.currentTimeMillis()))
.name(COMPLETE_EVENT_NAME)
.data(COMPLETE_EVENT_DATA));
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}
}
客户端:
- 由于我们在
SseEmitter
上指定了 name
事件,因此将在浏览器上将事件分派给指定事件名称的侦听器;网站源代码应该使用 addEventListener()
来监听命名事件。 (注意:如果没有为消息指定事件名称,则会调用 onmessage
处理程序)
- 在
COMPLETE
事件上调用 EventSource
以释放客户端连接。
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
var sse = new EventSource('http://localhost:8080/federation/api/customers/5d96348feb061d13f46aa6ce/search?nativeQuery=true&queryString=*&size=10&customerSources=1,2,3&start=0');
sse.addEventListener("SEARCH", function(evt) {
var data = JSON.parse(evt.data);
console.log(data);
});
sse.addEventListener("COMPLETE", function(evt) {
console.log(evt);
sse.close();
});
根据 HTML standard for Server-sent events
Clients will reconnect if the connection is closed; a client can be told to stop reconnecting using the HTTP 204 No Content response code.
所以 Spring 的 SseEmitter 的行为符合预期,complete()
的目的是确保所有事件都已发送,然后关闭连接。
您需要在后续请求中实现 return 204 http 代码的服务器端逻辑(例如,通过检查会话 ID),或者发送一个特殊事件并在收到它后从客户端关闭连接作为
我正在尝试设置一个 Spring SseEmitter 来发送一系列 运行 作业的状态更新。它似乎在工作,但是:
每当我在 Java 服务器代码中调用 emitter.complete()
时,javascript EventSource
客户端调用已注册的 onerror
函数,然后调用我的 Java端点再次使用新连接。这在 Firefox 和 Chrome.
我或许可以从 Java 发送明确的 "end-of-data" 消息,然后检测到它并在客户端上调用 eventSource.close()
,但是有更好的方法吗?
在那种情况下 emitter.complete()
的目的是什么?
此外,如果我总是不得不在客户端终止连接,那么我猜服务器端的每个连接都会因超时或写入错误而终止,在这种情况下我可能想手动发送每隔几秒返回某种心跳?
如果我不得不做这一切,感觉就像我错过了什么。
我已将以下内容添加到我的 Spring 启动应用程序以触发 SSE 连接 close()
服务器端:
- 创建一个简单的控制器,returns SseEmitter。
- 将后端逻辑包装在单线程执行程序服务中。
- 将您的事件发送到 SseEmitter。
完成时通过 SseEmitter 发送一个完成类型的事件。
@RestController public class SearchController { @Autowired private SearchDelegate searchDelegate; @GetMapping(value = "/{customerId}/search") @ResponseStatus(HttpStatus.OK) @ApiOperation(value = "Search Sources", notes = "Search Sources") @ApiResponses(value = { @ApiResponse(code = 201, message = "OK"), @ApiResponse(code = 401, message = "Unauthorized") }) @ResponseBody public SseEmitter search(@ApiParam(name = "searchCriteria", value = "searchCriteria", required = true) @ModelAttribute @Valid final SearchCriteriaDto searchCriteriaDto) throws Exception { return searchDelegate.route(searchCriteriaDto); } } @Service public class SearchDelegate { public static final String SEARCH_EVENT_NAME = "SEARCH"; public static final String COMPLETE_EVENT_NAME = "COMPLETE"; public static final String COMPLETE_EVENT_DATA = "{\"name\": \"COMPLETED_STREAM\"}"; @Autowired private SearchService searchService; private ExecutorService executor = Executors.newCachedThreadPool(); public SseEmitter route(SearchCriteriaDto searchCriteriaDto) throws Exception { SseEmitter emitter = new SseEmitter(); executor.execute(() -> { try { if(!searchCriteriaDto.getCustomerSources().isEmpty()) { searchCriteriaDto.getCustomerSources().forEach(customerSource -> { try { SearchResponse searchResponse = searchService.search(searchCriteriaDto); emitter.send(SseEmitter.event() .id(customerSource.getSourceId()) .name(SEARCH_EVENT_NAME) .data(searchResponse)); } catch (Exception e) { log.error("Error while executing query for customer {} with source {}, Caused by {}", customerId, source.getType(), e.getMessage()); } }); }else { log.debug("No available customerSources for the specified customer"); } emitter.send(SseEmitter.event(). id(String.valueOf(System.currentTimeMillis())) .name(COMPLETE_EVENT_NAME) .data(COMPLETE_EVENT_DATA)); emitter.complete(); } catch (Exception ex) { emitter.completeWithError(ex); } }); return emitter; } }
客户端:
- 由于我们在
SseEmitter
上指定了name
事件,因此将在浏览器上将事件分派给指定事件名称的侦听器;网站源代码应该使用addEventListener()
来监听命名事件。 (注意:如果没有为消息指定事件名称,则会调用onmessage
处理程序) - 在
COMPLETE
事件上调用EventSource
以释放客户端连接。
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
var sse = new EventSource('http://localhost:8080/federation/api/customers/5d96348feb061d13f46aa6ce/search?nativeQuery=true&queryString=*&size=10&customerSources=1,2,3&start=0');
sse.addEventListener("SEARCH", function(evt) {
var data = JSON.parse(evt.data);
console.log(data);
});
sse.addEventListener("COMPLETE", function(evt) {
console.log(evt);
sse.close();
});
根据 HTML standard for Server-sent events
Clients will reconnect if the connection is closed; a client can be told to stop reconnecting using the HTTP 204 No Content response code.
所以 Spring 的 SseEmitter 的行为符合预期,complete()
的目的是确保所有事件都已发送,然后关闭连接。
您需要在后续请求中实现 return 204 http 代码的服务器端逻辑(例如,通过检查会话 ID),或者发送一个特殊事件并在收到它后从客户端关闭连接作为