连续执行多个独立作业

Execute multiple independent jobs continuously

我有一组相互独立的作业。因此,这些作业中的每一个都可以 运行 同时使用 goroutines。请注意,一旦单个作业完成,它应该等待几秒钟然后重新开始(适用于所有作业)并且这将循环进行直到 Go API 服务停止。另请注意,所有这些作业都执行相同的 goroutine(进行 REST 调用)。在 Go 中实现它的最佳模式是什么。请注意,在我的服务关闭之前,我还想等待当前正在执行的作业完成。

如果我没理解错的话,你正在找这样的东西。

此代码将 运行 workers 循环,workers 运行 并行作为一个组,直到您退出程序发送结束信号,但在退出之前等待当前循环完成.

func main() {
    srv := server{
        workers: 5,
    }
    srv.Run()
}

// inspired by: https://goinbigdata.com/golang-wait-for-all-goroutines-to-finish/#:~:text=A%20WaitGroup%20allows%20to%20wait,until%20all%20goroutines%20have%20finished.
func work(wg *sync.WaitGroup, i int) {
    defer wg.Done()

    rand.Seed(time.Now().UnixNano())
    n := rand.Intn(10)

    fmt.Printf("Worker %v: Started\n", i)
    time.Sleep(time.Duration(n) * time.Second)
    fmt.Printf("Worker %v: Finished\n", i)

}

type server struct {
    running bool
    workers int
}

func (srv *server) Run() {

    done := make(chan bool, 1) // this channel

    signalCh := make(chan os.Signal, 1) // this channel will get a signal on system call
    signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-signalCh
        srv.running = false
        done <- true
    }()

    srv.running = true
    for srv.running {
        var wg sync.WaitGroup
        for i := 0; i < srv.workers; i++ {
            wg.Add(1)
            go work(&wg, i)
        }
        wg.Wait()
    }
    <-done
}

如果我没看错,你正在寻找这样的东西 这是一个带有消费者池的服务,可以同时执行作业。当一项工作完成后,它会在一段时间后再次重复,直到您停止服务。

type job struct {
    id     int
    result chan error
}

func newJob(id int) job {
    return job{
        id:     id,
        result: make(chan error, 1),
    }
}

type service struct {
    pending chan job

    consumerLimit  int
    repeatInterval time.Duration

    isClosed chan struct{}
    shutdown chan chan error
}

func newService(repeatInterval time.Duration, consumerLimit int, pendingChannelSize int) *service {
    s := &service{
        pending:        make(chan job, pendingChannelSize),
        consumerLimit:  consumerLimit,
        repeatInterval: repeatInterval,
        isClosed:       make(chan struct{}, consumerLimit),
        shutdown:       make(chan chan error),
    }

    for i := 0; i < s.consumerLimit; i++ {
        go s.consumer()
    }

    return s
}

func (s *service) do(ctx context.Context, job job) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case s.pending <- job:
        return <-job.result
    case <-s.isClosed:
        return errors.New("service has been shut down")
    }
}

func (s *service) consumer() {
    for {
        select {
        case j := <-s.pending:
            //Simulate working process
            time.Sleep(time.Duration(rand.Intn(200)) + 200)
            j.result <- nil
            fmt.Println(fmt.Sprintf("job %v is done", j.id))

            go func() {
                //Repeat after a time
                time.Sleep(s.repeatInterval)
                ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
                defer cancel()
                if err := s.do(ctx, newJob(j.id)); err != nil {
                    fmt.Println(fmt.Errorf("failed to send job to repeat: %v", err))
                }
            }()
        case result := <-s.shutdown:
            result <- nil
            return
        }
    }
}

func (s *service) close() error {
    result := make(chan error, 1)
    for i := 0; i < s.consumerLimit; i++ {
        s.shutdown <- result
    }
    close(s.isClosed)
    return <-result
}

func main() {
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)

    service := newService(time.Second, 5, 1000)

    //Assign jobs
    for i := 1; i < 10; i++ {
        go func(i int) {
            if err := service.do(context.Background(), newJob(i)); err != nil {
                fmt.Println(fmt.Errorf("failed to send job: %v", err))
            }
        }(i)
    }

    select {
    case <-interrupt:
        switch err := service.close(); err {
        case nil:
            fmt.Println("service has been shutdown successfully")
        default:
            fmt.Println(fmt.Errorf("failed to graceful shut down service: %w", err))
        }
        return
    }
}

您想实施工作人员池。这是创建工作池的简单方法。您可以根据您的需求自定义worker方法和作业类型。

package main

import (
    "fmt"
    "sync"
)

type Jobs struct {
    ID string
    // or anything you want to add
}

func main() {

    jobs := make(Jobs)

    var wg sync.WaitGroup
    numWorker := 16
    for i := 0; i < numWorker; i++ {
        wg.Add(1)
        go func() {
            worker(jobs)
            wg.Done()
        }()
    }

    tasks := []Jobs{}
    // inset your task here
    for _, i := range tasks {
        jobs <- i
    }
    close(jobs)
    wg.Wait()
}

func worker(jobs chan Jobs) {

    for job := range jobs {

        // do whatever you want to do
        doSomething(job)

    }
}

func doSomething(job Jobs) {
    fmt.Println(job)
}