如何检查 NATS 请求是否被取消
How to check if a NATS request was cancelled
当用户点击路由时,我发出 NATS 请求,并等待回复:
ctx := r.Context()
reply, err := natsConnection.RequestWithContext(ctx, "Subject", payload)
订阅者将执行资源密集型任务:
natsConnection.Subscribe("Subject", func(m *nats.Msg) {
//do consuming task and cancel if the request was cancelled.
natsConnection.Publish(m.Reply, []byte("Reply"))
})
如果请求被取消,我如何告诉订阅者停止工作。
我想为工作人员收到的每条消息创建一个 context
然后给发件人一个新的 inbox
以等待其中的回复,或者向它发送取消通知:
contextMap := map[string]func(){}
natsConnection.Subscribe("Subject", func(m *nats.Msg) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
//create new inbox
inbox := nats.NewInbox()
//subscribe to the new inbox and cancel and clean up the current task is something is recived
newSub, err := natsConnection.Subscribe(inbox, func(m *nats.Msg) {
contextMap[inbox]()
})
contextMap[inbox] = func() {
fmt.Println("recived")
cancel()
delete(contextMap, inbox)
newSub.Unsubscribe()
}
//tell the requester to wait for reply in this inbox
natsConnection.Publish(m.Reply, []byte(inbox))
//do the work and send the reply when finished
tick(ctx)
natsConnection.Publish(inbox, []byte("done"))
})
在请求者端,我发送第一个请求并在指定的 inbox
中等待回复或 context
被取消,在这种情况下我向 inbox
让接收者知道:
ctx := r.Context()
inbox, _ := natsConnection.RequestWithContext(ctx, "Subject", []byte("Payload"))
reply := make(chan *nats.Msg, 1)
natsConnection.ChanSubscribe(string(inbox.Data), reply)
select {
case <-ctx.Done():
fmt.Println("Canceled")
natsConnection.Publish(string(inbox.Data), []byte(""))
return
case r := <-reply:
fmt.Println(string(r.Data))
}
tick
只是一个勾选的函数:
func tick(ctx context.Context) {
ticker := time.NewTicker(time.Millisecond * 500)
for {
select {
case <-ctx.Done():
ticker.Stop()
fmt.Println("context was canceled", ctx.Err())
return
case <-ticker.C:
fmt.Println("Tick")
}
}
}
现在这行得通,但我想知道是否有更简单的方法,或者如果没有,我怎样才能使这段代码更好?
您想通过取消请求来节省什么?您知道不会使用的关于请求或回复传输的工作?
如果响应很密集并且需要几秒钟才能生成,那么是的,您可以发送一条单独的消息来终止请求。然而,大多数架构让响应者继续并丢弃结果。
当用户点击路由时,我发出 NATS 请求,并等待回复:
ctx := r.Context()
reply, err := natsConnection.RequestWithContext(ctx, "Subject", payload)
订阅者将执行资源密集型任务:
natsConnection.Subscribe("Subject", func(m *nats.Msg) {
//do consuming task and cancel if the request was cancelled.
natsConnection.Publish(m.Reply, []byte("Reply"))
})
如果请求被取消,我如何告诉订阅者停止工作。
我想为工作人员收到的每条消息创建一个 context
然后给发件人一个新的 inbox
以等待其中的回复,或者向它发送取消通知:
contextMap := map[string]func(){}
natsConnection.Subscribe("Subject", func(m *nats.Msg) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
//create new inbox
inbox := nats.NewInbox()
//subscribe to the new inbox and cancel and clean up the current task is something is recived
newSub, err := natsConnection.Subscribe(inbox, func(m *nats.Msg) {
contextMap[inbox]()
})
contextMap[inbox] = func() {
fmt.Println("recived")
cancel()
delete(contextMap, inbox)
newSub.Unsubscribe()
}
//tell the requester to wait for reply in this inbox
natsConnection.Publish(m.Reply, []byte(inbox))
//do the work and send the reply when finished
tick(ctx)
natsConnection.Publish(inbox, []byte("done"))
})
在请求者端,我发送第一个请求并在指定的 inbox
中等待回复或 context
被取消,在这种情况下我向 inbox
让接收者知道:
ctx := r.Context()
inbox, _ := natsConnection.RequestWithContext(ctx, "Subject", []byte("Payload"))
reply := make(chan *nats.Msg, 1)
natsConnection.ChanSubscribe(string(inbox.Data), reply)
select {
case <-ctx.Done():
fmt.Println("Canceled")
natsConnection.Publish(string(inbox.Data), []byte(""))
return
case r := <-reply:
fmt.Println(string(r.Data))
}
tick
只是一个勾选的函数:
func tick(ctx context.Context) {
ticker := time.NewTicker(time.Millisecond * 500)
for {
select {
case <-ctx.Done():
ticker.Stop()
fmt.Println("context was canceled", ctx.Err())
return
case <-ticker.C:
fmt.Println("Tick")
}
}
}
现在这行得通,但我想知道是否有更简单的方法,或者如果没有,我怎样才能使这段代码更好?
您想通过取消请求来节省什么?您知道不会使用的关于请求或回复传输的工作?
如果响应很密集并且需要几秒钟才能生成,那么是的,您可以发送一条单独的消息来终止请求。然而,大多数架构让响应者继续并丢弃结果。