连续执行多个独立作业
Execute multiple independent jobs continuously
我有一组相互独立的作业。因此,这些作业中的每一个都可以 运行 同时使用 goroutines。请注意,一旦单个作业完成,它应该等待几秒钟然后重新开始(适用于所有作业)并且这将循环进行直到 Go API 服务停止。另请注意,所有这些作业都执行相同的 goroutine(进行 REST 调用)。在 Go 中实现它的最佳模式是什么。请注意,在我的服务关闭之前,我还想等待当前正在执行的作业完成。
如果我没理解错的话,你正在找这样的东西。
此代码将 运行 workers 循环,workers 运行 并行作为一个组,直到您退出程序发送结束信号,但在退出之前等待当前循环完成.
func main() {
srv := server{
workers: 5,
}
srv.Run()
}
// inspired by: https://goinbigdata.com/golang-wait-for-all-goroutines-to-finish/#:~:text=A%20WaitGroup%20allows%20to%20wait,until%20all%20goroutines%20have%20finished.
func work(wg *sync.WaitGroup, i int) {
defer wg.Done()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(10)
fmt.Printf("Worker %v: Started\n", i)
time.Sleep(time.Duration(n) * time.Second)
fmt.Printf("Worker %v: Finished\n", i)
}
type server struct {
running bool
workers int
}
func (srv *server) Run() {
done := make(chan bool, 1) // this channel
signalCh := make(chan os.Signal, 1) // this channel will get a signal on system call
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalCh
srv.running = false
done <- true
}()
srv.running = true
for srv.running {
var wg sync.WaitGroup
for i := 0; i < srv.workers; i++ {
wg.Add(1)
go work(&wg, i)
}
wg.Wait()
}
<-done
}
如果我没看错,你正在寻找这样的东西
这是一个带有消费者池的服务,可以同时执行作业。当一项工作完成后,它会在一段时间后再次重复,直到您停止服务。
type job struct {
id int
result chan error
}
func newJob(id int) job {
return job{
id: id,
result: make(chan error, 1),
}
}
type service struct {
pending chan job
consumerLimit int
repeatInterval time.Duration
isClosed chan struct{}
shutdown chan chan error
}
func newService(repeatInterval time.Duration, consumerLimit int, pendingChannelSize int) *service {
s := &service{
pending: make(chan job, pendingChannelSize),
consumerLimit: consumerLimit,
repeatInterval: repeatInterval,
isClosed: make(chan struct{}, consumerLimit),
shutdown: make(chan chan error),
}
for i := 0; i < s.consumerLimit; i++ {
go s.consumer()
}
return s
}
func (s *service) do(ctx context.Context, job job) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.pending <- job:
return <-job.result
case <-s.isClosed:
return errors.New("service has been shut down")
}
}
func (s *service) consumer() {
for {
select {
case j := <-s.pending:
//Simulate working process
time.Sleep(time.Duration(rand.Intn(200)) + 200)
j.result <- nil
fmt.Println(fmt.Sprintf("job %v is done", j.id))
go func() {
//Repeat after a time
time.Sleep(s.repeatInterval)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := s.do(ctx, newJob(j.id)); err != nil {
fmt.Println(fmt.Errorf("failed to send job to repeat: %v", err))
}
}()
case result := <-s.shutdown:
result <- nil
return
}
}
}
func (s *service) close() error {
result := make(chan error, 1)
for i := 0; i < s.consumerLimit; i++ {
s.shutdown <- result
}
close(s.isClosed)
return <-result
}
func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
service := newService(time.Second, 5, 1000)
//Assign jobs
for i := 1; i < 10; i++ {
go func(i int) {
if err := service.do(context.Background(), newJob(i)); err != nil {
fmt.Println(fmt.Errorf("failed to send job: %v", err))
}
}(i)
}
select {
case <-interrupt:
switch err := service.close(); err {
case nil:
fmt.Println("service has been shutdown successfully")
default:
fmt.Println(fmt.Errorf("failed to graceful shut down service: %w", err))
}
return
}
}
您想实施工作人员池。这是创建工作池的简单方法。您可以根据您的需求自定义worker方法和作业类型。
package main
import (
"fmt"
"sync"
)
type Jobs struct {
ID string
// or anything you want to add
}
func main() {
jobs := make(Jobs)
var wg sync.WaitGroup
numWorker := 16
for i := 0; i < numWorker; i++ {
wg.Add(1)
go func() {
worker(jobs)
wg.Done()
}()
}
tasks := []Jobs{}
// inset your task here
for _, i := range tasks {
jobs <- i
}
close(jobs)
wg.Wait()
}
func worker(jobs chan Jobs) {
for job := range jobs {
// do whatever you want to do
doSomething(job)
}
}
func doSomething(job Jobs) {
fmt.Println(job)
}
我有一组相互独立的作业。因此,这些作业中的每一个都可以 运行 同时使用 goroutines。请注意,一旦单个作业完成,它应该等待几秒钟然后重新开始(适用于所有作业)并且这将循环进行直到 Go API 服务停止。另请注意,所有这些作业都执行相同的 goroutine(进行 REST 调用)。在 Go 中实现它的最佳模式是什么。请注意,在我的服务关闭之前,我还想等待当前正在执行的作业完成。
如果我没理解错的话,你正在找这样的东西。
此代码将 运行 workers 循环,workers 运行 并行作为一个组,直到您退出程序发送结束信号,但在退出之前等待当前循环完成.
func main() {
srv := server{
workers: 5,
}
srv.Run()
}
// inspired by: https://goinbigdata.com/golang-wait-for-all-goroutines-to-finish/#:~:text=A%20WaitGroup%20allows%20to%20wait,until%20all%20goroutines%20have%20finished.
func work(wg *sync.WaitGroup, i int) {
defer wg.Done()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(10)
fmt.Printf("Worker %v: Started\n", i)
time.Sleep(time.Duration(n) * time.Second)
fmt.Printf("Worker %v: Finished\n", i)
}
type server struct {
running bool
workers int
}
func (srv *server) Run() {
done := make(chan bool, 1) // this channel
signalCh := make(chan os.Signal, 1) // this channel will get a signal on system call
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalCh
srv.running = false
done <- true
}()
srv.running = true
for srv.running {
var wg sync.WaitGroup
for i := 0; i < srv.workers; i++ {
wg.Add(1)
go work(&wg, i)
}
wg.Wait()
}
<-done
}
如果我没看错,你正在寻找这样的东西 这是一个带有消费者池的服务,可以同时执行作业。当一项工作完成后,它会在一段时间后再次重复,直到您停止服务。
type job struct {
id int
result chan error
}
func newJob(id int) job {
return job{
id: id,
result: make(chan error, 1),
}
}
type service struct {
pending chan job
consumerLimit int
repeatInterval time.Duration
isClosed chan struct{}
shutdown chan chan error
}
func newService(repeatInterval time.Duration, consumerLimit int, pendingChannelSize int) *service {
s := &service{
pending: make(chan job, pendingChannelSize),
consumerLimit: consumerLimit,
repeatInterval: repeatInterval,
isClosed: make(chan struct{}, consumerLimit),
shutdown: make(chan chan error),
}
for i := 0; i < s.consumerLimit; i++ {
go s.consumer()
}
return s
}
func (s *service) do(ctx context.Context, job job) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.pending <- job:
return <-job.result
case <-s.isClosed:
return errors.New("service has been shut down")
}
}
func (s *service) consumer() {
for {
select {
case j := <-s.pending:
//Simulate working process
time.Sleep(time.Duration(rand.Intn(200)) + 200)
j.result <- nil
fmt.Println(fmt.Sprintf("job %v is done", j.id))
go func() {
//Repeat after a time
time.Sleep(s.repeatInterval)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := s.do(ctx, newJob(j.id)); err != nil {
fmt.Println(fmt.Errorf("failed to send job to repeat: %v", err))
}
}()
case result := <-s.shutdown:
result <- nil
return
}
}
}
func (s *service) close() error {
result := make(chan error, 1)
for i := 0; i < s.consumerLimit; i++ {
s.shutdown <- result
}
close(s.isClosed)
return <-result
}
func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
service := newService(time.Second, 5, 1000)
//Assign jobs
for i := 1; i < 10; i++ {
go func(i int) {
if err := service.do(context.Background(), newJob(i)); err != nil {
fmt.Println(fmt.Errorf("failed to send job: %v", err))
}
}(i)
}
select {
case <-interrupt:
switch err := service.close(); err {
case nil:
fmt.Println("service has been shutdown successfully")
default:
fmt.Println(fmt.Errorf("failed to graceful shut down service: %w", err))
}
return
}
}
您想实施工作人员池。这是创建工作池的简单方法。您可以根据您的需求自定义worker方法和作业类型。
package main
import (
"fmt"
"sync"
)
type Jobs struct {
ID string
// or anything you want to add
}
func main() {
jobs := make(Jobs)
var wg sync.WaitGroup
numWorker := 16
for i := 0; i < numWorker; i++ {
wg.Add(1)
go func() {
worker(jobs)
wg.Done()
}()
}
tasks := []Jobs{}
// inset your task here
for _, i := range tasks {
jobs <- i
}
close(jobs)
wg.Wait()
}
func worker(jobs chan Jobs) {
for job := range jobs {
// do whatever you want to do
doSomething(job)
}
}
func doSomething(job Jobs) {
fmt.Println(job)
}