Angular 8 - 处理 SSE 重新连接错误
Angular 8 - handling SSE reconnect on error
我正在研究 Angular 8(使用 Electron 6 和 Ionic 4)项目,现在我们正处于评估阶段,我们正在决定是否用 SSE(服务器发送的事件)或 Web 套接字替换轮询。我的工作是研究SSE。
我创建了生成随机数的小型快递应用程序,一切正常。唯一让我烦恼的是在服务器错误时重新连接的正确方法。
我的实现是这样的:
private createSseSource(): Observable<MessageEvent> {
return Observable.create(observer => {
this.eventSource = new EventSource(SSE_URL);
this.eventSource.onmessage = (event) => {
this.zone.run(() => observer.next(event));
};
this.eventSource.onopen = (event) => {
console.log('connection open');
};
this.eventSource.onerror = (error) => {
console.log('looks like the best thing to do is to do nothing');
// this.zone.run(() => observer.error(error));
// this.closeSseConnection();
// this.reconnectOnError();
};
});
}
我试图在这个 answer 之后实现 reconnectOnError()
功能,但我就是无法让它工作。然后我放弃了 reconnectOnError()
功能,这似乎是一件更好的事情。不要尝试关闭和重新连接,也不要将错误传播到可观察对象。请坐等,当服务器再次 运行 时,它将自动重新连接。
问题是,这真的是最好的做法吗? 重要的是,FE 应用程序与其自己的服务器通信,而其他服务器无法访问应用程序实例(内置设备)。
我发现我的问题受到了一些关注,所以我决定 post 我的解决方案。回答我的问题:“省略重新连接功能真的是最好的做法吗?”我不知道 :)。但是这个解决方案对我有用,并且在生产中得到证明,它提供了如何在某种程度上实际控制 SSE 重新连接的方法。
这是我所做的:
- 重写了
createSseSource
函数所以 return 类型是 void
- 不是 returning observable,来自 SSE 的数据被馈送到 subjects/NgRx actions
- 添加了 public
openSseChannel
和私有 reconnectOnError
函数以更好地控制
- 添加了私有函数
processSseEvent
来处理自定义消息类型
由于我在这个项目上使用 NgRx,每个 SSE 消息都会发送相应的操作,但这可以替换为 ReplaySubject
并公开为 observable
.
// Public function, initializes connection, returns true if successful
openSseChannel(): boolean {
this.createSseEventSource();
return !!this.eventSource;
}
// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
// Close event source if current instance of SSE service has some
if (this.eventSource) {
this.closeSseConnection();
this.eventSource = null;
}
// Open new channel, create new EventSource
this.eventSource = new EventSource(this.sseChannelUrl);
// Process default event
this.eventSource.onmessage = (event: MessageEvent) => {
this.zone.run(() => this.processSseEvent(event));
};
// Add custom events
Object.keys(SSE_EVENTS).forEach(key => {
this.eventSource.addEventListener(SSE_EVENTS[key], event => {
this.zone.run(() => this.processSseEvent(event));
});
});
// Process connection opened
this.eventSource.onopen = () => {
this.reconnectFrequencySec = 1;
};
// Process error
this.eventSource.onerror = (error: any) => {
this.reconnectOnError();
};
}
// Processes custom event types
private processSseEvent(sseEvent: MessageEvent): void {
const parsed = sseEvent.data ? JSON.parse(sseEvent.data) : {};
switch (sseEvent.type) {
case SSE_EVENTS.STATUS: {
this.store.dispatch(StatusActions.setStatus({ status: parsed }));
// or
// this.someReplaySubject.next(parsed);
break;
}
// Add others if neccessary
default: {
console.error('Unknown event:', sseEvent.type);
break;
}
}
}
// Handles reconnect attempts when the connection fails for some reason.
// const SSE_RECONNECT_UPPER_LIMIT = 64;
private reconnectOnError(): void {
const self = this;
this.closeSseConnection();
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = setTimeout(() => {
self.openSseChannel();
self.reconnectFrequencySec *= 2;
if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
}
}, this.reconnectFrequencySec * 1000);
}
由于 SSE 事件被馈送到 subject/actions,因此连接丢失并不重要,因为至少最后一个事件保留在主题或存储中。然后可以静默尝试重新连接,并且在发送新数据时进行无缝处理。
我正在研究 Angular 8(使用 Electron 6 和 Ionic 4)项目,现在我们正处于评估阶段,我们正在决定是否用 SSE(服务器发送的事件)或 Web 套接字替换轮询。我的工作是研究SSE。
我创建了生成随机数的小型快递应用程序,一切正常。唯一让我烦恼的是在服务器错误时重新连接的正确方法。
我的实现是这样的:
private createSseSource(): Observable<MessageEvent> {
return Observable.create(observer => {
this.eventSource = new EventSource(SSE_URL);
this.eventSource.onmessage = (event) => {
this.zone.run(() => observer.next(event));
};
this.eventSource.onopen = (event) => {
console.log('connection open');
};
this.eventSource.onerror = (error) => {
console.log('looks like the best thing to do is to do nothing');
// this.zone.run(() => observer.error(error));
// this.closeSseConnection();
// this.reconnectOnError();
};
});
}
我试图在这个 answer 之后实现 reconnectOnError()
功能,但我就是无法让它工作。然后我放弃了 reconnectOnError()
功能,这似乎是一件更好的事情。不要尝试关闭和重新连接,也不要将错误传播到可观察对象。请坐等,当服务器再次 运行 时,它将自动重新连接。
问题是,这真的是最好的做法吗? 重要的是,FE 应用程序与其自己的服务器通信,而其他服务器无法访问应用程序实例(内置设备)。
我发现我的问题受到了一些关注,所以我决定 post 我的解决方案。回答我的问题:“省略重新连接功能真的是最好的做法吗?”我不知道 :)。但是这个解决方案对我有用,并且在生产中得到证明,它提供了如何在某种程度上实际控制 SSE 重新连接的方法。
这是我所做的:
- 重写了
createSseSource
函数所以 return 类型是void
- 不是 returning observable,来自 SSE 的数据被馈送到 subjects/NgRx actions
- 添加了 public
openSseChannel
和私有reconnectOnError
函数以更好地控制 - 添加了私有函数
processSseEvent
来处理自定义消息类型
由于我在这个项目上使用 NgRx,每个 SSE 消息都会发送相应的操作,但这可以替换为 ReplaySubject
并公开为 observable
.
// Public function, initializes connection, returns true if successful
openSseChannel(): boolean {
this.createSseEventSource();
return !!this.eventSource;
}
// Creates SSE event source, handles SSE events
protected createSseEventSource(): void {
// Close event source if current instance of SSE service has some
if (this.eventSource) {
this.closeSseConnection();
this.eventSource = null;
}
// Open new channel, create new EventSource
this.eventSource = new EventSource(this.sseChannelUrl);
// Process default event
this.eventSource.onmessage = (event: MessageEvent) => {
this.zone.run(() => this.processSseEvent(event));
};
// Add custom events
Object.keys(SSE_EVENTS).forEach(key => {
this.eventSource.addEventListener(SSE_EVENTS[key], event => {
this.zone.run(() => this.processSseEvent(event));
});
});
// Process connection opened
this.eventSource.onopen = () => {
this.reconnectFrequencySec = 1;
};
// Process error
this.eventSource.onerror = (error: any) => {
this.reconnectOnError();
};
}
// Processes custom event types
private processSseEvent(sseEvent: MessageEvent): void {
const parsed = sseEvent.data ? JSON.parse(sseEvent.data) : {};
switch (sseEvent.type) {
case SSE_EVENTS.STATUS: {
this.store.dispatch(StatusActions.setStatus({ status: parsed }));
// or
// this.someReplaySubject.next(parsed);
break;
}
// Add others if neccessary
default: {
console.error('Unknown event:', sseEvent.type);
break;
}
}
}
// Handles reconnect attempts when the connection fails for some reason.
// const SSE_RECONNECT_UPPER_LIMIT = 64;
private reconnectOnError(): void {
const self = this;
this.closeSseConnection();
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = setTimeout(() => {
self.openSseChannel();
self.reconnectFrequencySec *= 2;
if (self.reconnectFrequencySec >= SSE_RECONNECT_UPPER_LIMIT) {
self.reconnectFrequencySec = SSE_RECONNECT_UPPER_LIMIT;
}
}, this.reconnectFrequencySec * 1000);
}
由于 SSE 事件被馈送到 subject/actions,因此连接丢失并不重要,因为至少最后一个事件保留在主题或存储中。然后可以静默尝试重新连接,并且在发送新数据时进行无缝处理。