将回调转换为可观察的 rxjs

Convert callbacks to observable rxjs

我想创建一个 nestjs 的拦截器,并在此级别上进行上传文件。 我找不到解决方案,如何通过 Observable (https://github.com/fastify/fastify-multipart)

解决此代码

我正在使用 fromPromise,但使用此工具时,我遇到了更多问题。 因为我只能解决一次。我需要对每个文件进行验证,如果验证 returns 解决,则 运行 用于写入。

const mp = req.multipart(handler, function (err) {
    console.log('upload completed')
    reply.code(200).send()
  })
  // handler wil run for each file
  function handler (field, file, filename, encoding, mimetype) {
    // to accumulate the file in memory! Be careful!
    //
    // file.pipe(concat(function (buf) {
    //   console.log('received', filename, 'size', buf.length)
    // }))
    //
    // or

    pump(file, fs.createWriteStream('a-destination'))

    // be careful of permission issues on disk and not overwrite
    // sensitive files that could cause security risks
  }

我使用这个代码,对于我的旧案例来说已经足够了...

const obs1$ = fromPromise(new Promise((res, rej) => {
            const mp = request.multipart((field, file, filename, encoding, mimetype) => {
                const writeStream = fs.createWriteStream('path');
                //here need run onValidation
                pump(file, writeStream);
            }, (err) => {
                //doSuccess
                (err) ? rej(err) : res();
            });
        }));

        return obs1$.pipe(
            catchError(error => {
                throw new BadRequestException(error);
            }),
            switchMap(() => {
                request.body = body;
                return next.handle();
            }), mergeAll());

但是我有一个新案例(我需要在保存文件之前进行验证)并且案例是:next.hander() return 可以通过一些方法观察到:

@Controller('test')
export class TestCtrl {

@Post('upload')
@UseInterceptors(new FileInterceptor())
    uploadFile() {
        return {
            onValidate: (file: IFiles) => {
                console.log(this.tst);
                return Promise.resolve(true);
            },
            onSuccess: () => {
                return Promise.resolve(true);
            }
        };
    }
}

试过类似的东西:

@Injectable()
export class FileInterceptor implements NestInterceptor {

    intercept(context: any, next: CallHandler): Observable<any> {
        const request = context.getRequest();
        const body: Array<any> = [];

        return next.handle().pipe(switchMap((methods: any) => {
            const mp = request.multipart((field, file, filename, encoding, mimetype) => {
                const writeStream = fs.createWriteStream('path');
                body.push({
                    filename, file
                });

                methods.onValidation(file).then(() => {
                    pump(file, writeStream);
                }).catch(() => {
                    writeStream.destroy();
                });
            }, () => {
                //done
                methods.onSuccess(body); // how to return this?
            });
            return null; // ???
        }));

 }
}

我几乎通过 bindNodeCallback 解决了我的问题,然后我编写了自己的 bindNodeCallback 方法,它可以通过双回调解决我的问题。

callbackFunc.apply(context, [...args, handler1, handler2, handler3]);