nestjs 中的 Rxjs - 可观察到的订阅错误

Rxjs in nestjs - Observable subscription error

我是试用 rxjs 和 nestjs 的新手。我目前正在尝试完成的用例是出于教育目的。所以我想使用 "fs" 模块读取一个 json 文件(在文件为空或无法读取的情况下抛出一个可观察到的错误)。现在我通过异步读取文件来创建一个可观察对象,在主题中设置观察者,然后在 控制器 中订阅主题。这是我在服务中的代码

@Injectable()
export class NewProviderService {
    private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
    // this is the variable that should be exposed. make the subject as private
    // this allows the service to be the sole propertier to modify the stream and
    // not the controller or components
    serviceSubject$:  Observable<HttpResponseModel[]>;

    private serviceErrorSubject: BehaviorSubject<any>;
    serviceErrorSubject$: Observable<any>;
    filePath: string;
    httpResponseObjectArray: HttpResponseModel[];
    constructor() {
        this.serviceSubject = new BehaviorSubject<HttpResponseModel[]>([]);
        this.serviceSubject$ = this.serviceSubject.asObservable();

        this.serviceErrorSubject = new BehaviorSubject<any>(null);
        this.serviceErrorSubject$ = this.serviceErrorSubject.asObservable();

        this.filePath = path.resolve(__dirname, './../../shared/assets/httpTest.json');
    }
     readFileFromJson() {
          return new Promise((resolve, reject) => {
            fs.exists(this.filePath.toString(), exists => {
                if (exists) {
                    fs.readFile(this.filePath.toString(), 'utf-8' , (err, data) => {
                        if (err) {
                            logger.info('error in reading file', err);
                            return reject('Error in reading the file' + err.message);
                        }

                        logger.info('file read without parsing fg', data.length);
                        if ((data.length !== 0) && !isNullOrUndefined(data) && data !== null) {
                            // this.httpResponseObjectArray = JSON.parse(data).HttpTestResponse;
                            // logger.info('array obj is:', this.httpResponseObjectArray);
                            logger.info('file read after parsing new', JSON.parse(data));
                            return resolve(JSON.parse(data).HttpTestResponse);
                        } else {
                            return reject(new FileExceptionHandler('no data in file'));
                        }
                    });
                } else {
                    return reject(new FileExceptionHandler('file cannot be read at the moment'));
                }
            });
          });
    }


        getData() {
                 from(this.readFileFromJson()).pipe(map(data => {
                        logger.info('data in obs', data);
                        this.httpResponseObjectArray = data as HttpResponseModel[];
                        return this.httpResponseObjectArray;
                 }), catchError(error => {
                     return Observable.throw(error);
                 }))
                 .subscribe(actualData => {
                    this.serviceSubject.next(actualData);
                }, err => {
                    logger.info('err in sub', typeof err, err);
                    this.serviceErrorSubject.next(err);
                });
            }

现在这是控制器class

@Get('/getJsonData')
public async getJsonData(@Req() requestAnimationFrame,@Req() req, @Res() res) {

  await this.newService.getData();
  this.newService.serviceSubject$.subscribe(data => {
    logger.info('data subscribed', data, _.isEmpty(data));
    if (!isNullOrUndefined(data) && !_.isEmpty(data)) {
      logger.info('coming in');
      res.status(HttpStatus.OK).send(data);
      res.end();
    }
  });
}

我遇到的问题是我第一次可以获取文件详细信息并且订阅被调用一次 > 它工作正常。在随后的请求中

Error [ERR_HTTP_HEADERS_SENT]: Cannot set headers after they are sent to the client
    at ServerResponse.setHeader (_http_outgoing.js:470:11)
    at ServerResponse.header (C:\personal\Node\test-nest.js\prj-sample\node_modules\express\lib\response.js:767:10)
    at Ser

端点 /getJsonData 导致错误。有人可以帮帮我吗。我相信第一次通话后订阅没有正常进行,但不确定如何结束以及如何解决

我建议 return 直接向控制器 Promise。在这里,您不需要 Observable。对于订阅者,您另外将 Promise 的值发送到您的 serviceSubject.

async getData() {
  try {
    const data = await this.readFileFromJson();
    this.serviceSubject.next(data as HttpResponseModel[]);
    return data;
  } catch (error) {
    // handle error
  }
}

在您的控制器中,您可以 return Promise:

@Get('/getJsonData')
public async getJsonData() {
  return this.newService.getData();
}

我认为尝试将冷 observable(我创建的那个)转换为 hot/warm observable 可能有助于插入到单个源并发出并完成其执行并将最后发出的数据维护到任何克隆值。因此,我使用 publishLast()、refCount() 运算符将冷 observable 转换为暖 observable,并且我可以实现单个订阅和 observable 的执行完成。这是我为工作所做的更改。

这是服务 class 我所做的更改

 getData() {
        return from(this.readFileFromJson()).pipe(map(data => {
                logger.info('data in obs', data);
                this.httpResponseObjectArray = data as HttpResponseModel[];
                return this.httpResponseObjectArray;
         }), publishLast(), refCount()
          , catchError(error => {
             return Observable.throw(error);
         }));
        //  .subscribe(actualData => {
        //     this.serviceSubject.next(actualData);
        // }, err => {
        //     logger.info('err in sub', typeof err, err);
        //     this.serviceErrorSubject.next(err);
        // });
    }

这是我在控制器中所做的更改

public async getJsonData(@Req() req, @Res() res) {
    let jsonData: HttpResponseModel[];
    await this.newService.getData().subscribe(data => {
      logger.info('dddd', data);
      res.send(data);
    });
}

也欢迎任何允许先将可观察对象订阅到主题,然后再在控制器中订阅该主题的答案。

我发现了一个很好的 post 关于热 vs 冷 observable 以及如何使 observable 订阅单一来源并将冷转换为 hot/warm observable - https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html

问题是您在控制器中订阅了 serviceSubject。每次发出新值时,它都会尝试发送响应。这第一次有效,但第二次它会告诉你它不能再次发送相同的响应;该请求已被处理。

您可以使用管道 first() 运算符在第一个值之后完成 Observable:

@Get('/getJsonData')
public async getJsonData() {
  await this.newService.getData();
  return this.newService.serviceSubject$.pipe(first())
}

您希望 Observable 共享(热),以便每个订阅者始终获得相同的最新值。这正是 BehaviourSubject 所做的。因此,当您公开公开它时,您不应该将 Subject 转换为 Observable,因为您将失去这种期望的行为。相反,您可以将 Subject 转换为 Observable,这样在内部它仍然是一个主题,但它不会公开 next() 方法来公开发出新值:

private serviceSubject: BehaviorSubject<HttpResponseModel[]>;
get serviceSubject$(): Observable<HttpResponseModel[]> {
   return this.serviceSubject;
}