RXJS 加密大文件并发送到 AWS S3

RXJS Encrypting large files and sending to AWS S3

我对 RXJS 比较陌生,它的学习曲线相当陡峭。我正在通过 GRPC 传输文件,我想用 AES 加密它们,然后将它们存储在 S3 存储桶中。目前我正在内存中存储缓冲区,如果我开始上传大文件,这将很快成为一个问题。

我想知道如何使用 RXJS 流,通过加密传输它然后将其传输到 S3 存储桶(即我不想将整个文件存储在内存中)

这可能吗,还是我误解了什么?

这是我目前的实现

  @GrpcStreamMethod()
  upload(data$: Observable<FileUploadRequest>): Observable<FileUploadResponse> {
    let fileBuffer: Buffer;
    let metaBuffer: Buffer;
    let storageBucket: string;
    let publicKey: string;
    let publicKeyHash: string;

    const response$ = new Subject<FileUploadResponse>();
    let headersDeliverd = false;

    const symetricKeys = this.service.generateEncryptionKeys(); //returns key + iv
    const fileCipher = this.service.getCypher(symetricKeys); //returns cipher
    const metaCipher = this.service.getCypher(symetricKeys); //returns cipher

    data$.subscribe({
      next: (data: FileUploadRequest) => {
        const content = data?.file?.content;
        const meta: Meta = data?.metadata;

        // Metadata has been delivered
        if (meta && !headersDeliverd) {
          headersDeliverd = true;

          publicKey = meta.owner;
          publicKeyHash = this.service.createHash(meta.owner);
          storageBucket = join(FILE_STORE, publicKeyHash);
          const metaBuff = this.service.metaToBuffer(meta);
          metaBuffer = metaCipher.update(metaBuff);
        }

        if (content && !headersDeliverd) {
          console.log('SOME ERROR');
        }

        if (content && headersDeliverd) {
          fileBuffer = fileCipher.update(content);
        }
      },
      complete: async () => {
        const encryptedSymetricKey = await this.service.encryptSymetricKey(
          symetricKeys,
          publicKey,
        );

        fileBuffer = Buffer.concat([fileBuffer, fileCipher.final()]);
        metaBuffer = Buffer.concat([metaBuffer, metaCipher.final()]);

        const documentId = uuidv4();
        const fileName = `${documentId}.file`;
        const metaName = `${documentId}.meta`;
        const keys = `${documentId}.access`;

        await this.service.uploadToAws(
          metaBuffer,
          `${publicKeyHash}/${metaName}`,
        );
        await this.service.uploadToAws(
          fileBuffer,
          `${publicKeyHash}/${fileName}`,
        );
        await this.service.uploadToAws(
          JSON.stringify({
            [publicKeyHash]: encryptedSymetricKey,
          }),
          `${publicKeyHash}/${keys}`,
        );

        response$.next({
          status: Status.SUCCESS,
          path: storageBucket,
        });
        return response$.complete();
      },
    });

    return response$.asObservable();
  }

我想到的解决方案是只创建一个流并将数据也推送给它。

uploadToAws(key) {
    const pass = new PassThrough();

    return {
      writeStream: pass,
      promise: this.s3.send(
        new PutObjectCommand({
          Bucket: '...',
          Key: key,
          Body: pass,
          ServerSideEncryption: '...',
          ContentLength: 37, //If you do not put content lendth will error
        }),
      ),
    };
}

const stream = new Readable({
  read(data) {
    return data;
  },
});

data$.subscribe({
    next: (data) => stream.push(data),
    error: (error) => stream.destroy(),
    complete: async () => stream.destroy()
});

stream.pipe(encryptFunc).pipe(uploadToAws);