
Populate concurrent map while iterating parquet files in parallel efficiently

我正在处理 parquet 文件,我在其中并行读取所有这些文件,并在遍历每个文件中的所有行后填充并发映射。文件总数为 50 个,每个文件大小最大约为 60MB。

我需要并行化我的 for 循环,它并行读取所有这些 parquet 文件,并通过并行读取所有这些镶木地板文件来填充地图。 data 结构中的这些并发映射将由多个 reader 线程同时读取,并由多个写入器在 for 循环内并行写入。我想确保它们是安全的并且操作是原子的。我还有 getter 方法来访问那些并发映射。


import (
pars3 "github.com/xitongsys/parquet-go-source/s3"
cmap "github.com/orcaman/concurrent-map"

type Data interface {
    GetCustomerMap() *cmap.ConcurrentMap
    GetCustomerMetricsMap() *cmap.ConcurrentMap

type data struct {
    // will have getter methods to access these below map
    customers          *cmap.ConcurrentMap
    customerMetrics    *cmap.ConcurrentMap

//loadParquet.. This will be called by background thread periodically
func (r *data) loadParquet(path string, bucket string, key string) error {
    var err error
    var files []string
    files, err = r.s3Client.ListObjects(bucket, key, ".parquet")
    if err != nil {
        return err

    var waitGroup sync.WaitGroup 
    // Set number of effective goroutines we want to wait upon 

    // parallelize below for loop in such a way so that I can populate my map in thread safe way?
    // And same map will be accessed by multiple reader threads too.
    // This writes to our map happens from background thread but there are lot of reader threads reading from the map.
    for i, file := range files {
        err = func() error {
            fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
            if err != nil {
                return errs.Wrap(err)
            defer xio.CloseIgnoringErrors(fr)

            pr, err := reader.NewParquetReader(fr, nil, 4)
            if err != nil {
                return errs.Wrap(err)

            // confuse on this for loop?
            // do we need to parallelize here too?
            for {
                rows, err := pr.ReadByNumber(100)
                if err != nil {
                    return errs.Wrap(err)

                if len(rows) <= 0 {

                byteSlice, err := json.Marshal(rows)
                if err != nil {
                    return errs.Wrap(err)

                var rows []ParquetProduct
                err = json.Unmarshal(byteSlice, &rows)
                if err != nil {
                    return errs.Wrap(err)

                // read rows struct and put inside concurrent map.
                // Need to populate map in such a way so that it is atomic and thread safe
                // from multiple parallel writes inside this for loop 
                // and multiple reads will happen from reader threads on these maps
                for i := range rows {
                  // ...
                  // ...
            return nil

        if err != nil {
            return err

        go task(&waitGroup) // Achieving maximum concurrency 
    // Wait until all goroutines have completed execution. 
    return nil

//GetCustomerMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMap() *cmap.ConcurrentMap {
    return r.customers

//GetCustomerMetricsMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMetricsMap() *cmap.ConcurrentMap {
    return r.customerMetrics

我正在使用这个 parquet 库来读取文件。

在 Golang 中并发访问 map 往往是 anti-pattern,并且通常需要使用锁来防止恐慌,例如:Golang fatal error: concurrent map writes.

相反,您可以使用 chan(通道)类型,让每个读取文件的 go 例程将其数据写入该通道,然后让一个 go-routine 监听该通道通道并将数据添加到地图。在这种方法中,映射只有一个 reader/writer,没有锁,并且每个 go-routine 在写入结果时都不会被阻塞(如果通道被缓冲)。


将 doWork() 替换为您读取文件的函数,并监听字节、文件的输出通道,无论您想要什么类型,这样您就可以将它们放入您的地图中。