如何使用 go 将 UploadPart S3 操作从传入请求流式传输到 AWS S3?

How to stream the UploadPart S3 operation from an incoming request to AWS S3 using go?

上下文

我们正在与我的团队一起构建一个反向代理来拦截对 S3 的所有传出请求,以便审计和控制来自不同应用程序的访问。

我们已经通过流式传输文件内容成功地实现了几乎所有的操作。例如,为了通过单一操作上传,我们使用 s3manager.Uploader 将传入请求(这是一个 io.Reader)的 body 流式传输到 S3 并下载(单一和多部分风格)我们使用原语 io.Copy 来写来自 s3.GetObjectOutput.Body 的响应(这是一个 io.ReadCloser)。

问题:

我们仍然无法通过流式传输实现的唯一操作是 upload-part(在分段上传的上下文中)。问题是 s3.UploadPartInput 需要一个 aws.ReadSeekCloser 并且要传递传入请求的 body 您需要将其缓冲在某个地方(例如,在内存中)。

这是我们目前所拥有的:

func (ph *VaultProxyHandler) HandleUploadPart(w http.ResponseWriter, r *http.Request, s3api s3iface.S3API, bucket string, key string, uploadID string, part int64) {
    buf := bytes.NewBuffer(nil)
    
    // here loads the entire body to memory
    if _, err := io.Copy(buf, r.Body); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    payload := buf.Bytes()

    input := &s3.UploadPartInput{
        Bucket:     aws.String(bucket),
        Key:        aws.String(key),
        UploadId:   aws.String(uploadID),
        PartNumber: aws.Int64(part),
        Body:       aws.ReadSeekCloser(bytes.NewReader(payload)),
    }

    output, err := s3api.UploadPart(input)

    // and so on...
}

问题:

有没有办法将 UploadPart 的传入请求流式传输到 S3? (按流我的意思是不要将整个 body 存储在内存中)。

最后,我通过使用 AWS SDK 构建请求并使用 unsigned payload 对其进行签名,找到了一种通过流处理反向代理传入 UploadPart 的方法=25=].

这是一个基本的例子:

type AwsService struct {
    Region   string
    S3Client s3iface.S3API
    Signer   *v4.Signer
}

func NewAwsService(region string, accessKey string, secretKey string, sessionToken string) (*AwsService, error) {
    creds := credentials.NewStaticCredentials(accessKey, secretKey, sessionToken)
    awsConfig := aws.NewConfig().
        WithRegion(region).
        WithCredentials(creds).
        WithCredentialsChainVerboseErrors(true)
    sess, err := session.NewSession(awsConfig)
    if err != nil {
        return nil, err
    }
    svc := s3.New(sess)

    signer := v4.NewSigner(creds)
    v4.WithUnsignedPayload(signer)

    return &AwsService{
        Region:   region,
        S3Client: svc,
        Signer:   signer,
    }, nil
}

func (s *AwsService) UploadPart(bucket string, key string, part int, uploadID string, payloadReader io.Reader, contentLength int64) (string, error) {

    input := &s3.UploadPartInput{
        Bucket:        aws.String(bucket),
        Key:           aws.String(key),
        UploadId:      aws.String(uploadID),
        PartNumber:    aws.Int64(int64(part)),
        ContentLength: aws.Int64(contentLength),
        Body:          aws.ReadSeekCloser(payloadReader),
    }

    req, output := s.S3Client.UploadPartRequest(input)

    _, err := s.Signer.Sign(req.HTTPRequest, req.Body, s3.ServiceName, s.Region, time.Now())
    err = req.Send()
    if err != nil {
        return "", err
    }

    return *output.ETag, nil
}

然后,可以从处理程序中调用它:

func HandleUploadPart(w http.ResponseWriter, r *http.Request) {

    query := r.URL.Query()
    region := query.Get("region")
    bucket := query.Get("bucket")
    key := query.Get("key")
    part, err := strconv.Atoi(query.Get("part"))
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    uploadID := query.Get("upload-id")
    payloadReader := r.Body

    contentLength, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    etag, err := awsService.UploadPart(region, bucket, key, part, uploadID, payloadReader, contentLength)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.Header().Set("ETag", etag)
}

缺点:

  • 客户端必须提前知道内容长度并发送。
  • 负载无法签名。