RXJS 加密大文件并发送到 AWS S3
RXJS Encrypting large files and sending to AWS S3
我对 RXJS 比较陌生,它的学习曲线相当陡峭。我正在通过 GRPC 传输文件,我想用 AES 加密它们,然后将它们存储在 S3 存储桶中。目前我正在内存中存储缓冲区,如果我开始上传大文件,这将很快成为一个问题。
我想知道如何使用 RXJS 流,通过加密传输它然后将其传输到 S3 存储桶(即我不想将整个文件存储在内存中)
这可能吗,还是我误解了什么?
这是我目前的实现
@GrpcStreamMethod()
upload(data$: Observable<FileUploadRequest>): Observable<FileUploadResponse> {
let fileBuffer: Buffer;
let metaBuffer: Buffer;
let storageBucket: string;
let publicKey: string;
let publicKeyHash: string;
const response$ = new Subject<FileUploadResponse>();
let headersDeliverd = false;
const symetricKeys = this.service.generateEncryptionKeys(); //returns key + iv
const fileCipher = this.service.getCypher(symetricKeys); //returns cipher
const metaCipher = this.service.getCypher(symetricKeys); //returns cipher
data$.subscribe({
next: (data: FileUploadRequest) => {
const content = data?.file?.content;
const meta: Meta = data?.metadata;
// Metadata has been delivered
if (meta && !headersDeliverd) {
headersDeliverd = true;
publicKey = meta.owner;
publicKeyHash = this.service.createHash(meta.owner);
storageBucket = join(FILE_STORE, publicKeyHash);
const metaBuff = this.service.metaToBuffer(meta);
metaBuffer = metaCipher.update(metaBuff);
}
if (content && !headersDeliverd) {
console.log('SOME ERROR');
}
if (content && headersDeliverd) {
fileBuffer = fileCipher.update(content);
}
},
complete: async () => {
const encryptedSymetricKey = await this.service.encryptSymetricKey(
symetricKeys,
publicKey,
);
fileBuffer = Buffer.concat([fileBuffer, fileCipher.final()]);
metaBuffer = Buffer.concat([metaBuffer, metaCipher.final()]);
const documentId = uuidv4();
const fileName = `${documentId}.file`;
const metaName = `${documentId}.meta`;
const keys = `${documentId}.access`;
await this.service.uploadToAws(
metaBuffer,
`${publicKeyHash}/${metaName}`,
);
await this.service.uploadToAws(
fileBuffer,
`${publicKeyHash}/${fileName}`,
);
await this.service.uploadToAws(
JSON.stringify({
[publicKeyHash]: encryptedSymetricKey,
}),
`${publicKeyHash}/${keys}`,
);
response$.next({
status: Status.SUCCESS,
path: storageBucket,
});
return response$.complete();
},
});
return response$.asObservable();
}
我想到的解决方案是只创建一个流并将数据也推送给它。
uploadToAws(key) {
const pass = new PassThrough();
return {
writeStream: pass,
promise: this.s3.send(
new PutObjectCommand({
Bucket: '...',
Key: key,
Body: pass,
ServerSideEncryption: '...',
ContentLength: 37, //If you do not put content lendth will error
}),
),
};
}
const stream = new Readable({
read(data) {
return data;
},
});
data$.subscribe({
next: (data) => stream.push(data),
error: (error) => stream.destroy(),
complete: async () => stream.destroy()
});
stream.pipe(encryptFunc).pipe(uploadToAws);
我对 RXJS 比较陌生,它的学习曲线相当陡峭。我正在通过 GRPC 传输文件,我想用 AES 加密它们,然后将它们存储在 S3 存储桶中。目前我正在内存中存储缓冲区,如果我开始上传大文件,这将很快成为一个问题。
我想知道如何使用 RXJS 流,通过加密传输它然后将其传输到 S3 存储桶(即我不想将整个文件存储在内存中)
这可能吗,还是我误解了什么?
这是我目前的实现
@GrpcStreamMethod()
upload(data$: Observable<FileUploadRequest>): Observable<FileUploadResponse> {
let fileBuffer: Buffer;
let metaBuffer: Buffer;
let storageBucket: string;
let publicKey: string;
let publicKeyHash: string;
const response$ = new Subject<FileUploadResponse>();
let headersDeliverd = false;
const symetricKeys = this.service.generateEncryptionKeys(); //returns key + iv
const fileCipher = this.service.getCypher(symetricKeys); //returns cipher
const metaCipher = this.service.getCypher(symetricKeys); //returns cipher
data$.subscribe({
next: (data: FileUploadRequest) => {
const content = data?.file?.content;
const meta: Meta = data?.metadata;
// Metadata has been delivered
if (meta && !headersDeliverd) {
headersDeliverd = true;
publicKey = meta.owner;
publicKeyHash = this.service.createHash(meta.owner);
storageBucket = join(FILE_STORE, publicKeyHash);
const metaBuff = this.service.metaToBuffer(meta);
metaBuffer = metaCipher.update(metaBuff);
}
if (content && !headersDeliverd) {
console.log('SOME ERROR');
}
if (content && headersDeliverd) {
fileBuffer = fileCipher.update(content);
}
},
complete: async () => {
const encryptedSymetricKey = await this.service.encryptSymetricKey(
symetricKeys,
publicKey,
);
fileBuffer = Buffer.concat([fileBuffer, fileCipher.final()]);
metaBuffer = Buffer.concat([metaBuffer, metaCipher.final()]);
const documentId = uuidv4();
const fileName = `${documentId}.file`;
const metaName = `${documentId}.meta`;
const keys = `${documentId}.access`;
await this.service.uploadToAws(
metaBuffer,
`${publicKeyHash}/${metaName}`,
);
await this.service.uploadToAws(
fileBuffer,
`${publicKeyHash}/${fileName}`,
);
await this.service.uploadToAws(
JSON.stringify({
[publicKeyHash]: encryptedSymetricKey,
}),
`${publicKeyHash}/${keys}`,
);
response$.next({
status: Status.SUCCESS,
path: storageBucket,
});
return response$.complete();
},
});
return response$.asObservable();
}
我想到的解决方案是只创建一个流并将数据也推送给它。
uploadToAws(key) {
const pass = new PassThrough();
return {
writeStream: pass,
promise: this.s3.send(
new PutObjectCommand({
Bucket: '...',
Key: key,
Body: pass,
ServerSideEncryption: '...',
ContentLength: 37, //If you do not put content lendth will error
}),
),
};
}
const stream = new Readable({
read(data) {
return data;
},
});
data$.subscribe({
next: (data) => stream.push(data),
error: (error) => stream.destroy(),
complete: async () => stream.destroy()
});
stream.pipe(encryptFunc).pipe(uploadToAws);