Nestjs @Sse:return rxjs 可观察到的承诺结果
Nestjs @Sse : return result of a promise in rxjs observable
我正在尝试超越 nest doc example 在控制器中实现 @Sse() 的简单步骤,但我直到现在才使用 rxjs,所以我有点困惑。
流量为:
- 客户端发送一个
POST
带有文件负载的请求
- 服务器(希望)发回新创建的
project
和一个道具 status:UPLOADED
- 客户端订阅下面描述的 sse 路由作为参数传递
projectId
它刚从服务器 收到
- 同时服务器
doingSomeStuff
可能需要 10 秒到一分钟。 doingSomeStuff
完成后,数据库中的项目状态从 UPLOADED
更新为 PARSED
我需要 @Sse 修饰函数以 x 时间间隔执行“状态检查”和 return project.status
(当时可能已更新也可能未更新)
我现在的密码:
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable<any> {
const projId$ = from(this.projectService.find(projectId)).pipe(
map((p) => ({
data: {
status: p.status,
},
})),
);
return interval(1000).pipe(switchMap(() => projId$));
}
我没有在此处放置服务代码,因为它是一个简单的 mongooseModel.findById
包装器。
我的问题是状态 returned 仍然是 UPLOADED
并且永远不会更新。似乎承诺不会在每次滴答声中重新执行。如果我 console.log 在我的服务中,我可以看到我的日志只用初始 project
值打印一次,而我希望在每个滴答时看到一个新日志。
这是一个两步过程。
我们使用 rxjs 中的 from
运算符根据 this.service.findById()
生成的承诺创建一个可观察对象。我们还使用 map
运算符来设置当有人订阅此 observable 时我们需要的对象的格式。
我们想 return 每 x 秒观察一次。 interval(x)
创建一个每 x
毫秒后发出一个值的可观察对象。因此,只要间隔发出值,我们就使用它,然后使用 switchMap
到 projId$
。 switchMap
运算符在外部可观察对象发出值时切换到内部可观察对象。
请注意:由于您的服务器可能需要 10 秒才能完成该操作,因此您应该相应地设置 intervalValue
。在下面的代码片段中,我将其设置为 10,000 毫秒,即 10 秒。
const intervalValue = 10000;
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable < any > {
return interval(intervalValue).pipe(
switchMap(() => this.projectService.find(projectId)),
map((p) => ({
data: {
status: p.status,
}
})));
}
// OR
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable < any > {
const projId$ = defer(() => this.service.findById(projectId)).pipe(
map(() => ({
data: {
_: projectId
}
}))
);
return interval(intervalValue).pipe(switchMap(() => projId$));
}
@softmarshmallow
您可以观看模型更改并使用可观察流发送它。
像这样
import { Controller, Param, Sse } from '@nestjs/common'
import { filter, map, Observable, Subject } from 'rxjs'
@Controller('project')
export class ProjectStatusController {
private project$ = new Subject()
// watch model event
// this method should be called when project is changed
onProjectChange(project) {
this.project$.next(project)
}
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable<any> {
return this.project$.pipe(
filter((project) => project.projectId === projectId),
map((project) => ({
data: {
status: project.status,
},
})),
)
}
}
我正在尝试超越 nest doc example 在控制器中实现 @Sse() 的简单步骤,但我直到现在才使用 rxjs,所以我有点困惑。
流量为:
- 客户端发送一个
POST
带有文件负载的请求 - 服务器(希望)发回新创建的
project
和一个道具status:UPLOADED
- 客户端订阅下面描述的 sse 路由作为参数传递
projectId
它刚从服务器 收到
- 同时服务器
doingSomeStuff
可能需要 10 秒到一分钟。doingSomeStuff
完成后,数据库中的项目状态从UPLOADED
更新为PARSED
我需要 @Sse 修饰函数以 x 时间间隔执行“状态检查”和 return project.status
(当时可能已更新也可能未更新)
我现在的密码:
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable<any> {
const projId$ = from(this.projectService.find(projectId)).pipe(
map((p) => ({
data: {
status: p.status,
},
})),
);
return interval(1000).pipe(switchMap(() => projId$));
}
我没有在此处放置服务代码,因为它是一个简单的 mongooseModel.findById
包装器。
我的问题是状态 returned 仍然是 UPLOADED
并且永远不会更新。似乎承诺不会在每次滴答声中重新执行。如果我 console.log 在我的服务中,我可以看到我的日志只用初始 project
值打印一次,而我希望在每个滴答时看到一个新日志。
这是一个两步过程。
我们使用 rxjs 中的
from
运算符根据this.service.findById()
生成的承诺创建一个可观察对象。我们还使用map
运算符来设置当有人订阅此 observable 时我们需要的对象的格式。我们想 return 每 x 秒观察一次。
interval(x)
创建一个每x
毫秒后发出一个值的可观察对象。因此,只要间隔发出值,我们就使用它,然后使用switchMap
到projId$
。switchMap
运算符在外部可观察对象发出值时切换到内部可观察对象。
请注意:由于您的服务器可能需要 10 秒才能完成该操作,因此您应该相应地设置 intervalValue
。在下面的代码片段中,我将其设置为 10,000 毫秒,即 10 秒。
const intervalValue = 10000;
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable < any > {
return interval(intervalValue).pipe(
switchMap(() => this.projectService.find(projectId)),
map((p) => ({
data: {
status: p.status,
}
})));
}
// OR
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable < any > {
const projId$ = defer(() => this.service.findById(projectId)).pipe(
map(() => ({
data: {
_: projectId
}
}))
);
return interval(intervalValue).pipe(switchMap(() => projId$));
}
@softmarshmallow 您可以观看模型更改并使用可观察流发送它。 像这样
import { Controller, Param, Sse } from '@nestjs/common'
import { filter, map, Observable, Subject } from 'rxjs'
@Controller('project')
export class ProjectStatusController {
private project$ = new Subject()
// watch model event
// this method should be called when project is changed
onProjectChange(project) {
this.project$.next(project)
}
@Sse('sse/:projectId')
sse(@Param('projectId') projectId: string): Observable<any> {
return this.project$.pipe(
filter((project) => project.projectId === projectId),
map((project) => ({
data: {
status: project.status,
},
})),
)
}
}