在有效地并行迭代镶木地板文件的同时填充并发映射

Populate concurrent map while iterating parquet files in parallel efficiently

我正在处理 parquet 文件,我在其中并行读取所有这些文件,并在遍历每个文件中的所有行后填充并发映射。文件总数为 50 个,每个文件大小最大约为 60MB。

我需要并行化我的 for 循环,它并行读取所有这些 parquet 文件,并通过并行读取所有这些镶木地板文件来填充地图。 data 结构中的这些并发映射将由多个 reader 线程同时读取,并由多个写入器在 for 循环内并行写入。我想确保它们是安全的并且操作是原子的。我还有 getter 方法来访问那些并发映射。

下面是我得到的代码,但我不确定这是否是并行化它的正确方法,还是我遗漏了一些非常基础的东西?

import (
//....
pars3 "github.com/xitongsys/parquet-go-source/s3"
"github.com/xitongsys/parquet-go/reader"
cmap "github.com/orcaman/concurrent-map"
//....
)

type Data interface {
    GetCustomerMap() *cmap.ConcurrentMap
    GetCustomerMetricsMap() *cmap.ConcurrentMap
}

type data struct {
    // will have getter methods to access these below map
    customers          *cmap.ConcurrentMap
    customerMetrics    *cmap.ConcurrentMap
}

//loadParquet.. This will be called by background thread periodically
func (r *data) loadParquet(path string, bucket string, key string) error {
    var err error
    var files []string
    files, err = r.s3Client.ListObjects(bucket, key, ".parquet")
    if err != nil {
        return err
    }

    var waitGroup sync.WaitGroup 
    // Set number of effective goroutines we want to wait upon 
    waitGroup.Add(len(files)) 

    // parallelize below for loop in such a way so that I can populate my map in thread safe way?
    // And same map will be accessed by multiple reader threads too.
    // This writes to our map happens from background thread but there are lot of reader threads reading from the map.
    for i, file := range files {
        err = func() error {
            fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
            if err != nil {
                return errs.Wrap(err)
            }
            defer xio.CloseIgnoringErrors(fr)

            pr, err := reader.NewParquetReader(fr, nil, 4)
            if err != nil {
                return errs.Wrap(err)
            }

            // confuse on this for loop?
            // do we need to parallelize here too?
            for {
                rows, err := pr.ReadByNumber(100)
                if err != nil {
                    return errs.Wrap(err)
                }

                if len(rows) <= 0 {
                    break
                }

                byteSlice, err := json.Marshal(rows)
                if err != nil {
                    return errs.Wrap(err)
                }

                var rows []ParquetProduct
                err = json.Unmarshal(byteSlice, &rows)
                if err != nil {
                    return errs.Wrap(err)
                }

                // read rows struct and put inside concurrent map.
                // Need to populate map in such a way so that it is atomic and thread safe
                // from multiple parallel writes inside this for loop 
                // and multiple reads will happen from reader threads on these maps
                for i := range rows {
                  // ...
                  // ...
                  r.customers.Set(.....)
                  r.customerMetrics.Set(....)
                }
            }
            return nil
        }()

        if err != nil {
            return err
        }

        go task(&waitGroup) // Achieving maximum concurrency 
    }
    // Wait until all goroutines have completed execution. 
    waitGroup.Wait()  
    return nil
}

//GetCustomerMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMap() *cmap.ConcurrentMap {
    return r.customers
}

//GetCustomerMetricsMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMetricsMap() *cmap.ConcurrentMap {
    return r.customerMetrics
}

我正在使用这个 parquet 库来读取文件。

在 Golang 中并发访问 map 往往是 anti-pattern,并且通常需要使用锁来防止恐慌,例如:Golang fatal error: concurrent map writes.

相反,您可以使用 chan(通道)类型,让每个读取文件的 go 例程将其数据写入该通道,然后让一个 go-routine 监听该通道通道并将数据添加到地图。在这种方法中,映射只有一个 reader/writer,没有锁,并且每个 go-routine 在写入结果时都不会被阻塞(如果通道被缓冲)。

可以在此处看到此模式的示例:https://play.golang.com/p/u-uYDaWiQiD

将 doWork() 替换为您读取文件的函数,并监听字节、文件的输出通道,无论您想要什么类型,这样您就可以将它们放入您的地图中。