如何强制中断在预定滴答内花费太长时间的函数执行
How to force an interrupt on function execution that is taking too long within a scheduled tick
我已经编写了这个调度程序,但我无法"kill"当f
花费超过输入recurring
时间间隔时输入函数f
.
如果 f
是一个进程而不是线程,那么我正在寻找的这个东西可能是某种定义的硬抢占。
f
定义是我无法控制的。它代表一个 ETL 作业,涉及在批处理执行期间处理来自多个数据库的数据。 f
它是用 go
编写的并且工作正常,但我需要以某种方式对其执行时间太长进行某种控制。
我知道 f
是原子的,所以它要么在执行结束时更改数据库,要么不更改。所以它可以被认为是安全的 "kill" 当它花费太长时间时。
func schedule(f func(), recurring time.Duration) chan struct{} {
ticker := time.NewTicker(recurring)
quit := make(chan struct{})
go func(inFunc func()) {
for {
select {
case <-ticker.C:
fmt.Println("Ticked")
// when "go" is removed, then if "f()" takes
// more than "recurring", then it postpones
// the following executions of "f()"
//
// instead somehow it should be "killed"
//
// check the timestamps in the execution of the test
go inFunc()
case <-quit:
fmt.Println("Stopping the scheduler")
ticker.Stop()
return
}
}
}(f)
return quit
}
为了看看发生了什么,我编写了这个测试:
func TestSlowExecutions(t *testing.T) {
// log some information using a human readable timestamp
dummyLog := func(format string, a ...interface{}) (n int, err error) {
prefix := fmt.Sprintf("[%v] ", time.Now())
message := fmt.Sprintf(format, a...)
return fmt.Printf("%s%s\n", prefix, message)
}
// UUID to be able to uniquely identify "fooFunc"
newUuid := func() string {
// sudo apt-get install uuid-runtime
uuid, _ := exec.Command("uuidgen").Output()
re := regexp.MustCompile(`\r?\n`)
uuidStr := re.ReplaceAllString(string(uuid), "")
return uuidStr
}
// simulate some sort of very slow execution
fooFunc := func() {
uuid := newUuid()
dummyLog("Ticked")
dummyLog("Starting task %s", uuid)
time.Sleep(2 * time.Second)
dummyLog("Finished task %s", uuid)
}
// test the very slow execution of "fooFunc"
quitChan := schedule(fooFunc, 1*time.Second)
time.Sleep(4 * time.Second)
close(quitChan)
// wait more to see the "closing" message
time.Sleep(4 * time.Second)
}
我与 f()
的作者协商使用超时 (https://golang.org/pkg/context/#WithTimeout) 的上下文。
请参阅下面的工作示例,注意 dummyLog
的时间戳,因此应该清楚此过程中涉及的所有 go 例程发生了什么。
代码:
// dummyLog could be used to log some information using a human readable timestamp and the benefits of `fmt.Sprintf`
func dummyLog(format string, a ...interface{}) (n int, err error) {
prefix := fmt.Sprintf("[%v] ", time.Now())
message := fmt.Sprintf(format, a...)
return fmt.Printf("%s%s\n", prefix, message)
}
// newContext is providing a brand new context with a upper bound timeout
func newContext(timeoutUpperBound time.Duration) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithTimeout(context.Background(), timeoutUpperBound)
deadline, ok := ctx.Deadline()
dummyLog("The context deadline is set to %s is it still valid? %v", deadline, ok)
return ctx, cancel
}
// schedule could be used to schedule arbitrary functions with a recurring interval
func schedule(f func(ctx context.Context), recurring time.Duration) chan struct{} {
ticker := time.NewTicker(recurring)
quit := make(chan struct{})
go func(inFunc func(ctx context.Context)) {
for {
select {
case <-ticker.C:
dummyLog("Ticked in the scheduler")
// simulate the "killing" of "inFunc" when it takes too long
go func(recurring time.Duration) {
inCtx, cancel := newContext(recurring)
defer cancel()
inFunc(inCtx)
}(recurring)
case <-quit:
dummyLog("Stopping the scheduler")
ticker.Stop()
return
}
}
}(f)
return quit
}
在测试环境中执行代码(虽然没有执行断言):
func TestSomething(t *testing.T) {
// newUuid could be used to generate a UUID to be able to uniquely identify "fooFunc"
newUuid := func() string {
// sudo apt-get install uuid-runtime
uuid, _ := exec.Command("uuidgen").Output()
re := regexp.MustCompile(`\r?\n`)
uuidStr := re.ReplaceAllString(string(uuid), "")
return uuidStr
}
// randBetween is a dummy random int generator using "math/rand"
randBetween := func(min int, max int) int {
return min + rand.Intn(max-min)
}
// fooFunc simulates some sort of very slow execution
// like database queries or network I/O
fooFunc := func(ctx context.Context) {
uuid := newUuid()
randWait := time.Duration(randBetween(0, 4000)) * time.Millisecond
dummyLog("Starting task %s taking %s random time", uuid, randWait)
select {
case <-time.After(randWait):
dummyLog("Finished task %s", uuid)
case <-ctx.Done():
dummyLog("Killed task %s, reason: '%s'", uuid, ctx.Err())
}
}
// test the very slow execution of "fooFunc"
timeoutUpperBound := 2 * time.Second
quitChan := schedule(fooFunc, timeoutUpperBound)
time.Sleep(6 * timeoutUpperBound)
close(quitChan)
// wait more to see the "closing" message
time.Sleep(4 * time.Second)
}
我已经编写了这个调度程序,但我无法"kill"当f
花费超过输入recurring
时间间隔时输入函数f
.
如果 f
是一个进程而不是线程,那么我正在寻找的这个东西可能是某种定义的硬抢占。
f
定义是我无法控制的。它代表一个 ETL 作业,涉及在批处理执行期间处理来自多个数据库的数据。 f
它是用 go
编写的并且工作正常,但我需要以某种方式对其执行时间太长进行某种控制。
我知道 f
是原子的,所以它要么在执行结束时更改数据库,要么不更改。所以它可以被认为是安全的 "kill" 当它花费太长时间时。
func schedule(f func(), recurring time.Duration) chan struct{} {
ticker := time.NewTicker(recurring)
quit := make(chan struct{})
go func(inFunc func()) {
for {
select {
case <-ticker.C:
fmt.Println("Ticked")
// when "go" is removed, then if "f()" takes
// more than "recurring", then it postpones
// the following executions of "f()"
//
// instead somehow it should be "killed"
//
// check the timestamps in the execution of the test
go inFunc()
case <-quit:
fmt.Println("Stopping the scheduler")
ticker.Stop()
return
}
}
}(f)
return quit
}
为了看看发生了什么,我编写了这个测试:
func TestSlowExecutions(t *testing.T) {
// log some information using a human readable timestamp
dummyLog := func(format string, a ...interface{}) (n int, err error) {
prefix := fmt.Sprintf("[%v] ", time.Now())
message := fmt.Sprintf(format, a...)
return fmt.Printf("%s%s\n", prefix, message)
}
// UUID to be able to uniquely identify "fooFunc"
newUuid := func() string {
// sudo apt-get install uuid-runtime
uuid, _ := exec.Command("uuidgen").Output()
re := regexp.MustCompile(`\r?\n`)
uuidStr := re.ReplaceAllString(string(uuid), "")
return uuidStr
}
// simulate some sort of very slow execution
fooFunc := func() {
uuid := newUuid()
dummyLog("Ticked")
dummyLog("Starting task %s", uuid)
time.Sleep(2 * time.Second)
dummyLog("Finished task %s", uuid)
}
// test the very slow execution of "fooFunc"
quitChan := schedule(fooFunc, 1*time.Second)
time.Sleep(4 * time.Second)
close(quitChan)
// wait more to see the "closing" message
time.Sleep(4 * time.Second)
}
我与 f()
的作者协商使用超时 (https://golang.org/pkg/context/#WithTimeout) 的上下文。
请参阅下面的工作示例,注意 dummyLog
的时间戳,因此应该清楚此过程中涉及的所有 go 例程发生了什么。
代码:
// dummyLog could be used to log some information using a human readable timestamp and the benefits of `fmt.Sprintf`
func dummyLog(format string, a ...interface{}) (n int, err error) {
prefix := fmt.Sprintf("[%v] ", time.Now())
message := fmt.Sprintf(format, a...)
return fmt.Printf("%s%s\n", prefix, message)
}
// newContext is providing a brand new context with a upper bound timeout
func newContext(timeoutUpperBound time.Duration) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithTimeout(context.Background(), timeoutUpperBound)
deadline, ok := ctx.Deadline()
dummyLog("The context deadline is set to %s is it still valid? %v", deadline, ok)
return ctx, cancel
}
// schedule could be used to schedule arbitrary functions with a recurring interval
func schedule(f func(ctx context.Context), recurring time.Duration) chan struct{} {
ticker := time.NewTicker(recurring)
quit := make(chan struct{})
go func(inFunc func(ctx context.Context)) {
for {
select {
case <-ticker.C:
dummyLog("Ticked in the scheduler")
// simulate the "killing" of "inFunc" when it takes too long
go func(recurring time.Duration) {
inCtx, cancel := newContext(recurring)
defer cancel()
inFunc(inCtx)
}(recurring)
case <-quit:
dummyLog("Stopping the scheduler")
ticker.Stop()
return
}
}
}(f)
return quit
}
在测试环境中执行代码(虽然没有执行断言):
func TestSomething(t *testing.T) {
// newUuid could be used to generate a UUID to be able to uniquely identify "fooFunc"
newUuid := func() string {
// sudo apt-get install uuid-runtime
uuid, _ := exec.Command("uuidgen").Output()
re := regexp.MustCompile(`\r?\n`)
uuidStr := re.ReplaceAllString(string(uuid), "")
return uuidStr
}
// randBetween is a dummy random int generator using "math/rand"
randBetween := func(min int, max int) int {
return min + rand.Intn(max-min)
}
// fooFunc simulates some sort of very slow execution
// like database queries or network I/O
fooFunc := func(ctx context.Context) {
uuid := newUuid()
randWait := time.Duration(randBetween(0, 4000)) * time.Millisecond
dummyLog("Starting task %s taking %s random time", uuid, randWait)
select {
case <-time.After(randWait):
dummyLog("Finished task %s", uuid)
case <-ctx.Done():
dummyLog("Killed task %s, reason: '%s'", uuid, ctx.Err())
}
}
// test the very slow execution of "fooFunc"
timeoutUpperBound := 2 * time.Second
quitChan := schedule(fooFunc, timeoutUpperBound)
time.Sleep(6 * timeoutUpperBound)
close(quitChan)
// wait more to see the "closing" message
time.Sleep(4 * time.Second)
}