同时从 S3 下载多个文件并合并它们
Downloading multiple files from S3 concurrently and consolidated them
我正在尝试同时从 S3 下载多个文件,并将它们的内容合并为一个字节 buffer.The 文件为 csv 格式。我的代码似乎在大部分时间(10 次尝试中的 8 次)都有效。但是有些情况下,在我检查了合并缓冲区之后,我得到的比我应该得到的要少(通常不超过 100 行缺失)。预期的记录总数为 4802。
如果 运行 我的代码按顺序没有这个问题 appear.But 我需要为 speed.This 使用 goroutines 是我试图 do.I 有 运行 的主要要求出现没有数据竞争的 go data race inspector,我打印的错误语句从未打印出来。
这是我使用的代码:
var pingsBuffer = aws.NewWriteAtBuffer([]byte{})
//range over the contents of the index file
for _, file := range indexList {
wg.Add(1)
go download(key + string(file), pingsBuffer, &wg)
}
wg.Wait()
和下载功能(也整合了下载的文件)
func download(key string, buffer *aws.WriteAtBuffer, wg *sync.WaitGroup) {
defer wg.Done()
awsBuffer := aws.NewWriteAtBuffer([]byte{})
input := &s3.GetObjectInput {
Bucket: aws.String(defaultLocationRootBucket),
Key: aws.String(key),
}
n1, downloadError := downloader.Download(awsBuffer, input)
if downloadError != nil {
loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to download from S3, file(%v) with error : %v.", key, downloadError))
return
}
lenghts3:= int64(len(buffer.Bytes()))
n2, bufferError := buffer.WriteAt(awsBuffer.Bytes(), lenghts3 )
if bufferError != nil {
loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to write to buffer, the file(%v) downloaded from S3 with error : %v.", key, bufferError))
}
此代码:
lenghts3:= int64(len(buffer.Bytes()))
是并发问题:两个routine可能同时获取长度,获取相同的起始位置,然后都继续写入相同起始位置的buffer,互相踩脚
由于您已经在内存中检索整个对象而不是流式传输到组合缓冲区,因此您也可以将每个文件的完整内容发送到一个通道上,并让该通道上的接收器将每个结果附加到当它们同步进入时共享字节缓冲区。
我正在尝试同时从 S3 下载多个文件,并将它们的内容合并为一个字节 buffer.The 文件为 csv 格式。我的代码似乎在大部分时间(10 次尝试中的 8 次)都有效。但是有些情况下,在我检查了合并缓冲区之后,我得到的比我应该得到的要少(通常不超过 100 行缺失)。预期的记录总数为 4802。 如果 运行 我的代码按顺序没有这个问题 appear.But 我需要为 speed.This 使用 goroutines 是我试图 do.I 有 运行 的主要要求出现没有数据竞争的 go data race inspector,我打印的错误语句从未打印出来。
这是我使用的代码:
var pingsBuffer = aws.NewWriteAtBuffer([]byte{})
//range over the contents of the index file
for _, file := range indexList {
wg.Add(1)
go download(key + string(file), pingsBuffer, &wg)
}
wg.Wait()
和下载功能(也整合了下载的文件)
func download(key string, buffer *aws.WriteAtBuffer, wg *sync.WaitGroup) {
defer wg.Done()
awsBuffer := aws.NewWriteAtBuffer([]byte{})
input := &s3.GetObjectInput {
Bucket: aws.String(defaultLocationRootBucket),
Key: aws.String(key),
}
n1, downloadError := downloader.Download(awsBuffer, input)
if downloadError != nil {
loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to download from S3, file(%v) with error : %v.", key, downloadError))
return
}
lenghts3:= int64(len(buffer.Bytes()))
n2, bufferError := buffer.WriteAt(awsBuffer.Bytes(), lenghts3 )
if bufferError != nil {
loglib.Log(loglib.LevelError, applicationType, fmt.Sprintf("Failed to write to buffer, the file(%v) downloaded from S3 with error : %v.", key, bufferError))
}
此代码:
lenghts3:= int64(len(buffer.Bytes()))
是并发问题:两个routine可能同时获取长度,获取相同的起始位置,然后都继续写入相同起始位置的buffer,互相踩脚
由于您已经在内存中检索整个对象而不是流式传输到组合缓冲区,因此您也可以将每个文件的完整内容发送到一个通道上,并让该通道上的接收器将每个结果附加到当它们同步进入时共享字节缓冲区。