在启动期间从读取文件加载数据,然后处理新文件并从地图中清除旧状态

Load data from reading files during startup and then process new files and clear old state from the map

我正在做一个项目,在启动期间我需要读取某些文件并将其存储在地图的内存中,然后定期查找新文件(如果有)然后替换地图内存中的任何内容在使用此新数据启动期间的早些时候。基本上每次如果有一个 full state 的新文件,那么我想将我的内存映射对象刷新到这个新文件而不是附加到它。

下面的方法loadAtStartupAndProcessNewChanges在服务器启动期间调用,它读取文件并将数据存储在内存中。它还启动一个 go-routine detectNewFiles 定期检查是否有任何新文件并将其存储在 deltaChan 通道上,稍后由另一个 go-routine processNewFiles 访问以读取新文件再次归档并将数据存储在同一个地图中。如果有任何错误,我们会将其存储在 err 频道中。 loadFiles 是将读取内存中的文件并将其存储在 map.

中的函数
type customerConfig struct {
  deltaChan   chan string
  err         chan error
  wg          sync.WaitGroup
  data        *cmap.ConcurrentMap
}

// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
  path, err := r.GetPath("...", "....")
  if err != nil {
    return err
  }

  r.wg.Add(1)
  go r.detectNewFiles(path)
  err = r.loadFiles(4, path)
  if err != nil {
    return err
  }
  r.wg.Add(1)
  go r.processNewFiles()
  return nil
}

这个方法基本上可以判断是否有任何新文件需要使用,如果有则将其放入 deltaChan 频道,稍后将由 [=20= 使用] go-routine,读取内存中的文件。如果有任何错误,它会将错误添加到错误通道。

func (r *customerConfig) detectNewFiles(rootPath string) {

}

这将读取所有 s3 文件并将其存储在内存中并且 return 错误。在这种方法中,我清除了我的地图以前的状态,以便它可以从新文件中获得新状态。此方法在服务器启动期间调用,并且在我们需要处理来自 processNewFiles go-routine.

的新文件时也会调用
func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  r.data.Clear()
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()
      return r.read(spn, file, bucket)
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }
  return nil
}

此方法读取文件并添加到 data 并发映射中。

func (r *customerConfig) read(file string, bucket string) error {
  // read file and store it in "data" concurrent map 
  // and if there is any error then return the error
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return errs.Wrap(err)
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return errs.Wrap(err)
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return nil
  }

  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      r.data.Set(key, hasInventory)
    }
  }
  return nil
}

此方法将选择 delta channel 上的内容,如果有任何新文件,则它将通过调用 loadFiles 方法开始读取该新文件。如果有任何错误,它会将错误添加到错误通道。

// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
  // find new files on delta channel
  // and call "loadFiles" method to read it
  // if there is any error, then it will add it to the error channel.
}

如果 error channel 上有任何错误,那么它将通过以下方法记录这些错误 -

func (r *customerConfig) handleError() {
  // read error from error channel if there is any
  // then log it
}

问题陈述

以上逻辑对我来说没有任何问题,但我的代码中有一个小错误,我无法弄清楚如何解决它。如您所见,我有一个并发映射,我在 read 方法中填充它,并在 loadFiles 方法中清除整个映射。因为每当增量通道上有新文件时,我不想在地图中保留以前的状态,所以这就是为什么我要从地图中删除所有内容,然后从新文件中添加新状态。

现在,如果 read 方法有任何错误,那么错误就会发生,因为我已经清除了我的 data 地图中的所有数据,这将是空地图,这不是我想要的。基本上,如果有任何错误,那么我想在 data 地图中保留以前的状态。我怎样才能在我上面的当前设计中解决这个问题。

注:我用的是golangconcurrent map

只需分配一张新地图。像这样:

var mu sync.Mutex
before := map[string]string{} // Some map before reading

after := make(map[string]string)

// Read files and fill `after` map

mu.Lock()
before = after
mu.Unlock()

collectedData worker goroutine 的并发写保护添加了 RWMutex

type customerConfig struct {
   ...
   m sync.RWMutex
}

不要在 read 方法中更新 mapread 方法只是 return 数据和错误

func (r *customerConfig) read(file string, bucket string) ([]CompModel, error) {
  // read file data and return with error if any
  var err error
  fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
  if err != nil {
    return (nil, errs.Wrap(err))
  }
  defer xio.CloseIgnoringErrors(fr)

  pr, err := reader.NewParquetReader(fr, nil, 8)
  if err != nil {
    return (nil, errs.Wrap(err))
  }

  if pr.GetNumRows() == 0 {
    spn.Infof("Skipping %s due to 0 rows", file)
    return (nil, errors.New("No Data"))
  }

  var invMods = []CompModel{}
  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    var jsonData []CompModel
    err = json.Unmarshal(byteSlice, &jsonData)
    if err != nil {
      return (nil, errs.Wrap(err))
    }
    invMods = append(invMods, jsonData...)
  }
  return invMods, nil
}

然后loadFiles你可以通过read收集数据return 方法,如果没有错误,则清除并更新地图 保持旧数据不变

func (r *customerConfig) loadFiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  // r.data.Clear() <- remove the clear from here
  g, ctx := errgroup.WithContext(context.Background())
  sem := make(chan struct{}, workers)
  collectedData := []CompModel{}

  for _, file := range files {
    select {
    case <-ctx.Done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.Go(func() error {
      defer func() { <-sem }()

      data, err:= r.read(spn, file, bucket)
      if err != nil {
        return err
      }

      r.m.Lock()
      append(collectedData, data...)
      r.m.Unlock()
      return nil
    })
  }

  if err := g.Wait(); err != nil {
    return err
  }

  r.data.Clear()
  for i := range collectedData {
    key := strconv.FormatInt(collectedData[i].ProductID, 10) + ":" + strconv.Itoa(int(collectedData[i].Iaz))
    hasInventory := false
    if collectedData[i].Available > 0 {
      hasInventory = true
    }
    r.data.Set(key, hasInventory)
  }

  return nil
}

注意:由于代码不可运行,只是更新了方法供参考,我没有包括用于更新切片的互斥锁,您可能需要处理这种情况。


只需 3 个函数即可实现同样的功能 - 检测、读取、加载、检测将按时间间隔检查新文件并在发现任何文件时推送到增量通道,加载将从增量通道读取文件路径并调用读取方法获取数据和错误然后检查是否没有错误然后清除地图并使用新内容更新否则记录错误,因此您将有 2 个 go 例程和 1 个函数将由加载例程调用

package main

import (
  "fmt"

  "time"
  "os"
  "os/signal"
  "math/rand"
)

func main() {
  fmt.Println(">>>", center("STARTED", 30), "<<<")

  c := &Config{
    InitialPath: "Old Path",
    DetectInterval: 3000,
  }
  c.start()
  fmt.Println(">>>", center("ENDED", 30), "<<<")
}

// 
func center(s string, w int) string {
    return fmt.Sprintf("%[1]*s", -w, fmt.Sprintf("%[1]*s", (w + len(s))/2, s))
}

type Config struct {
  deltaCh chan string
  ticker *time.Ticker
  stopSignal chan os.Signal
  InitialPath string
  DetectInterval time.Duration
}

func (c *Config) start() {
  c.stopSignal = make(chan os.Signal, 1)
  signal.Notify(c.stopSignal, os.Interrupt)

  c.ticker = time.NewTicker(c.DetectInterval * time.Millisecond)
  c.deltaCh = make(chan string, 1)
  go c.detect()
  go c.load()
  if c.InitialPath != "" {
    c.deltaCh <- c.InitialPath
  }
  <- c.stopSignal
  c.ticker.Stop()
}

// Detect New Files
func (c *Config) detect() {
  for {
    select {
      case <- c.stopSignal:
        return
      case <- c.ticker.C:
        fmt.Println(">>>", center("DETECT", 30), "<<<")
        c.deltaCh <- fmt.Sprintf("PATH %f", rand.Float64() * 1.5)
    }
  }
}
// Read Files
func read(path string) (map[string]int, error) {
  data := make(map[string]int)
  data[path] = 0
  fmt.Println(">>>", center("READ", 30), "<<<")
  fmt.Println(path)
  return data, nil
}
// Load Files
func (c *Config) load() {
  for {
    select {
      case <- c.stopSignal:
        return
      case path := <- c.deltaCh:
        fmt.Println(">>>", center("LOAD", 30), "<<<")
        data, err := read(path)
        if err != nil {
          fmt.Println("Log Error")
        } else {
          fmt.Println("Success", data)
        }
        fmt.Println()
    }
  }
}

注意:示例代码中未包含地图,可以轻松更新以包含地图

不要在 loadFile 方法中清除地图,而是在 read

中执行类似的操作
func (r *customerConfig) read(file string, bucket string) error {
  m := cmap.New() // create a new map
  // ...
  for {
    rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
    if err != nil {
      return errs.Wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteSlice, err := json.Marshal(rows)
    if err != nil {
      return errs.Wrap(err)
    }
    var invMods []CompModel
    err = json.Unmarshal(byteSlice, &invMods)
    if err != nil {
      return errs.Wrap(err)
    }

    for i := range invMods {
      key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
      hasInventory := false
      if invMods[i].Available > 0 {
        hasInventory = true
      }
      m.Set(key, hasInventory)
    }
  }
  r.data = m // Use the new map
  return nil
}

我觉得你的设计太复杂了。它可以更简单地解决,从而提供您想要的所有好处:

  • 并发访问安全
  • 重新加载检测到的更改
  • 访问配置会为您提供最新的、成功加载的配置
  • 最新的配置总是可以立即访问,即使由于检测到的更改加载新配置需要很长时间
  • 如果加载新配置失败,将保留之前的“快照”并保持当前状态
  • 作为奖励,它更简单,甚至不使用第 3 方库

让我们看看如何实现:


有一个 CustomerConfig 结构保存你想要缓存的所有东西(这是“快照”):

type CustomerConfig struct {
    Data map[string]bool

    // Add other props if you need:
    LoadedAt time.Time
}

提供加载您希望缓存的配置的函数。注意:这个函数是无状态的,它不访问/操作包级变量:

func loadConfig() (*CustomerConfig, error) {
    cfg := &CustomerConfig{
        Data:     map[string]bool{},
        LoadedAt: time.Now(),
    }

    // Logic to load files, and populate cfg.Data
    // If an error occurs, return it

    // If loading succeeds, return the config
    return cfg, nil
}

现在让我们来创建我们的“缓存管理器”。缓存管理器存储实际/当前配置(快照),并提供对其的访问。为了安全的并发访问(和更新),我们使用 sync.RWMutex。也有停止管理器的方法(停止并发刷新):

type ConfigCache struct {
    configMu sync.RWMutex
    config   *CustomerConfig
    closeCh  chan struct{}
}

创建缓存加载初始配置。同时启动一个 goroutine,负责定期检查变化。

func NewConfigCache() (*ConfigCache, error) {
    cfg, err := loadConfig()
    if err != nil {
        return nil, fmt.Errorf("loading initial config failed: %w", err)
    }

    cc := &ConfigCache{
        config:  cfg,
        closeCh: make(chan struct{}),
    }

    // launch goroutine to periodically check for changes, and load new configs
    go cc.refresher()

    return cc, nil
}

refresher() 定期检查变化,如果检测到变化,调用 loadConfig() 加载新数据进行缓存,并将其存储为当前/实际配置(同时锁定 configMu).它还监视 closeCh 以在请求时停止:

func (cc *ConfigCache) refresher() {
    ticker := time.NewTicker(1 * time.Minute) // Every minute
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // Check if there are changes
            changes := false // logic to detect changes
            if !changes {
                continue // No changes, continue
            }

            // Changes! load new config:
            cfg, err := loadConfig()
            if err != nil {
                log.Printf("Failed to load config: %v", err)
                continue // Keep the previous config
            }

            // Apply / store new config
            cc.configMu.Lock()
            cc.config = cfg
            cc.configMu.Unlock()

        case <-cc.closeCh:
            return
        }
    }
}

关闭缓存管理器(刷新 goroutine)就像:

func (cc *ConfigCache) Stop() {
    close(cc.closeCh)
}

最后遗漏的部分是您如何访问当前配置。这是一个简单的 GetConfig() 方法(也使用 configMu,但在 read-only 模式下):

func (cc *ConfigCache) GetConfig() *CustomerConfig {
    cc.configMu.RLock()
    defer cc.configMu.RUnlock()
    return cc.config
}

你可以这样使用:

cc, err := NewConfigCache()
if err != nil {
    // Decide what to do: retry, terminate etc.
}

// Where ever, whenever you need the actual (most recent) config in your app:

cfg := cc.GetConfig()
// Use cfg

在你关闭你的应用程序之前(或者你想停止刷新),你可以调用cc.Stop()