Nestjs @Sse:return rxjs 可观察到的承诺结果

Nestjs @Sse : return result of a promise in rxjs observable

我正在尝试超越 nest doc example 在控制器中实现 @Sse() 的简单步骤,但我直到现在才使用 rxjs,所以我有点困惑。

流量为:

  1. 客户端发送一个 POST 带有文件负载的请求
  2. 服务器(希望)发回新创建的 project 和一个道具 status:UPLOADED
  3. 客户端订阅下面描述的 sse 路由作为参数传递 projectId 它刚从服务器
  4. 收到
  5. 同时服务器 doingSomeStuff 可能需要 10 秒到一分钟。 doingSomeStuff 完成后,数据库中的项目状态从 UPLOADED 更新为 PARSED

我需要 @Sse 修饰函数以 x 时间间隔执行“状态检查”和 return project.status(当时可能已更新也可能未更新)

我现在的密码:

  @Sse('sse/:projectId')
  sse(@Param('projectId') projectId: string): Observable<any> {
    const projId$ = from(this.projectService.find(projectId)).pipe(
      map((p) => ({
        data: {
          status: p.status,
        },
      })),
    );
    return interval(1000).pipe(switchMap(() => projId$));
  }

我没有在此处放置服务代码,因为它是一个简单的 mongooseModel.findById 包装器。

我的问题是状态 returned 仍然是 UPLOADED 并且永远不会更新。似乎承诺不会在每次滴答声中重新执行。如果我 console.log 在我的服务中,我可以看到我的日志只用初始 project 值打印一次,而我希望在每个滴答时看到一个新日志。

这是一个两步过程。

  1. 我们使用 rxjs 中的 from 运算符根据 this.service.findById() 生成的承诺创建一个可观察对象。我们还使用 map 运算符来设置当有人订阅此 observable 时我们需要的对象的格式。

  2. 我们想 return 每 x 秒观察一次。 interval(x) 创建一个每 x 毫秒后发出一个值的可观察对象。因此,只要间隔发出值,我们就使用它,然后使用 switchMapprojId$switchMap 运算符在外部可观察对象发出值时切换到内部可观察对象。

请注意:由于您的服务器可能需要 10 秒才能完成该操作,因此您应该相应地设置 intervalValue。在下面的代码片段中,我将其设置为 10,000 毫秒,即 10 秒。

const intervalValue = 10000;

@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable < any > {
  return interval(intervalValue).pipe(
    switchMap(() => this.projectService.find(projectId)),
    map((p) => ({
      data: {
        status: p.status,
      }
    })));
}


// OR

@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable < any > {
  const projId$ = defer(() => this.service.findById(projectId)).pipe(
    map(() => ({
      data: {
        _: projectId
      }
    }))
  );

  return interval(intervalValue).pipe(switchMap(() => projId$));
}

@softmarshmallow 您可以观看模型更改并使用可观察流发送它。 像这样

import { Controller, Param, Sse } from '@nestjs/common'
import { filter, map, Observable, Subject } from 'rxjs'

@Controller('project')
export class ProjectStatusController {
  private project$ = new Subject()

  // watch model event
  // this method should be called when project is changed
  onProjectChange(project) {
    this.project$.next(project)
  }

  @Sse('sse/:projectId')
  sse(@Param('projectId') projectId: string): Observable<any> {
    return this.project$.pipe(
      filter((project) => project.projectId === projectId),
      map((project) => ({
        data: {
          status: project.status,
        },
      })),
    )
  }
}