EventSource 停止从服务器接收任何更新

EventSource stop recieving any update from server

我有一个关于 EventSource 的奇怪错误。

我有一个服务器,它通过 EventSource 永久地向 UI 发送一些事件。直到几天前,一切正常。 最近有更新,服务器现在在某个通道上发送新数据。

问题是,出于我尚未发现的原因,有时,EventSource 现在停止工作。

连接仍然打开,请求的状态仍然是挂起且未关闭,控制台上完全没有错误。服务器也仍在将事件流式传输到 UI。但是 EventSource 只是不再更新了。我也试过直接用curl请求命中,但是他没有得到任何更新,直到我手动刷新。

这是我的客户端代码 =>

import { Injectable, HostListener, OnDestroy } from '@angular/core';
import { environment } from 'src/environments/environment';

@Injectable({
  providedIn: 'root'
})
export class SSEService implements OnDestroy {
  private eventSource: EventSource;
  private callbacks: Map<string, (e: any) => void> = new Map<string, (e: any) => void>();

  init(channel: string) {
    if (this.eventSource) { return; }
    console.log('SSE Channel Start');
    this.eventSource = new EventSource(`${environment.SSE_URL}?channel=${channel}`);
  }

  protected callListener(cb: (d) => void): (e) => void {
    const callee = ({data}): void => {
      try {
        const d = JSON.parse(data);
        cb(d.message);
      } catch (e) {
        console.error(e, data);
      }
    };
    return callee;
  }

  private addEventToEventSrc(event: string, callback: any, owner: string): void {
    console.log(`Subscribed to ⇢ ${event} (owned by: ${owner})`);
    const cb = this.callListener(callback);
    this.eventSource.addEventListener(event, cb);
    if (!this.callbacks.get(`${event}_${owner}`)) {
        this.callbacks.set(`${event}_${owner}`, cb);
    }
  }

  subscribe(event: string, callback: any, owner?: string): void {
    if (!this.eventSource) { return; }
    if (!owner) { owner = 'default'; }
    if (!event) { event = 'message'; }
    this.addEventToEventSrc(event, callback, owner);
}

  unsubscribe(event: string, owner?: string): void {
    if (!this.eventSource) { return; }
    if (!owner) { owner = 'default'; }
    if (!event) { event = 'message'; }
    if (this.callbacks.get(`${event}_${owner}`)) {
      console.log(`Unsubscribed to ⇢ ${event} (owned by: ${owner})`);
      this.eventSource.removeEventListener(event, this.callbacks.get(`${event}_${owner}`));
    }
    this.callbacks.delete(`${event}_${owner}`);
  }

  @HostListener('window:beforeunload')
  onBrowserClose() {
    this.clearAll();
  }

  ngOnDestroy() {
    this.clearAll();
  }

  clearAll() {
    if (this.eventSource) {
      console.log('SSE Channel Closed');
      this.eventSource.close();
      this.eventSource = null;
    }
    this.callbacks = new Map<string, (e: any) => void>();
  }
}

我尝试记录收到的数据,尝试捕获...但它从未显示任何错误,它只是停止获取更新。

从服务器发送的数据是我的问题,但服务器不会停止流,因此我的 EventSource 应该崩溃或继续运行。

但现在它正在默默地切断所有更新。

如果您能告诉我原因可能出在哪里,我将不胜感激

事实证明,我的EventSource客户端实现是正确的。

服务器端,我们有 1 个服务器和一个调度程序,服务器在同一个频道上以非常短的延迟向调度程序发送大量更新。 EventSource 将所有这些消息视为单个大消息,并在他认为消息结束之前停止刷新。但这并没有发生,所以它在尝试下载时陷入了开放状态。