如何停止正在侦听 RethinkDB 变更源的 goroutine?
How to stop a goroutine that is listening for RethinkDB changefeeds?
我正在尝试弄清楚如何将 RethinkDB 更改源与 golang 一起使用。
我的具体问题是如何停止监听更改的 goroutine
数据库。例如,请参见下面的函数 getData()
。我 运行 这是一个处理程序
通过调用 go getData(c)
函数。每当数据库更新时,记录是
传递给通道c
,然后传递给处理函数并发送给客户端
使用 SSE 技术。我的问题是:当客户端断开连接时,我知道如何停止和退出
处理函数;然而 运行ning getData()
函数的 goroutine 保持 运行ning。什么可以
我要关闭它吗?基于 Whosebug 上的其他答案,我能想到的一种解决方案是
发送信号关闭另一个通道上的 goroutine 并使用 select 语句来处理这个
信号。比如我可以替换
for cur.Next(&rec) {
c <- rec
}
在下面的函数定义中:
for cur.Next(&rec) {
select {
case <- closesignal:
return
default:
c <- rec
}
}
其中,closesignal
是作为 getData()
的第三个参数给出的另一个通道,并且
当客户端断开连接时,处理程序会在此通道上发送一条消息。
这种方法的问题是:如果特定的 rethinkdb 查询的结果怎么办
从不更新。在这种情况下,将不会进入 for cur.Next(&rec)
循环并且不会使用 closesignal
。
这个 goroutine 会保持 运行ning 吗?如果是这样,我该如何停止这个 goroutine?
getData()
函数
func getData(session *r.Session, c chan interface{}) {
var rec interface{}
changesOpts := r.ChangesOpts{
IncludeInitial: true,
}
cur, err := r.DB(DBNAME).Table("test").Changes(changesOpts).Run(session)
if err != nil {
log.Println(err)
return
}
defer cur.Close()
defer func() {
fmt.Println("exiting getData goroutine()...")
}()
for cur.Next(&rec) {
c <- rec
}
}
您可以通过关闭光标来停止正在侦听更改源的 goroutine。例如,此代码将在关闭前监听 10 秒的更改提要:
go func() {
time.Sleep(10 * time.Second)
cur.Close()
}()
for cur.Next(&rec) {
c <- rec
}
// Loop exits as the cursor has been closed
您应该通过 context 停止更改源。您可以将上下文传递给 RunOpts
。这将立即关闭您的更改源。
我认为处理这种情况的最佳方法是使用 contexts 关闭游标,使用 goroutine。
这里有一些关于上下文的good documentation from the go blog。
这是一个上下文实现的例子:
func AreThereChanges(ctx context.Context) error {
cursor, err := r.Table("something").
Changes().
Run(db.Session)
if err != nil {
//manage error
}
go func(ctx context.Context) {
<-ctx.Done()
cursor.Close()
}(ctx)
var changeFeed map[string]interface{}
for cursor.Next(&changeFeed) {
if changeFeed["old_val"] != nil {
//do something
}
if changeFeed["new_val"] != nil {
//do something
}
}
我正在尝试弄清楚如何将 RethinkDB 更改源与 golang 一起使用。
我的具体问题是如何停止监听更改的 goroutine
数据库。例如,请参见下面的函数 getData()
。我 运行 这是一个处理程序
通过调用 go getData(c)
函数。每当数据库更新时,记录是
传递给通道c
,然后传递给处理函数并发送给客户端
使用 SSE 技术。我的问题是:当客户端断开连接时,我知道如何停止和退出
处理函数;然而 运行ning getData()
函数的 goroutine 保持 运行ning。什么可以
我要关闭它吗?基于 Whosebug 上的其他答案,我能想到的一种解决方案是
发送信号关闭另一个通道上的 goroutine 并使用 select 语句来处理这个
信号。比如我可以替换
for cur.Next(&rec) {
c <- rec
}
在下面的函数定义中:
for cur.Next(&rec) {
select {
case <- closesignal:
return
default:
c <- rec
}
}
其中,closesignal
是作为 getData()
的第三个参数给出的另一个通道,并且
当客户端断开连接时,处理程序会在此通道上发送一条消息。
这种方法的问题是:如果特定的 rethinkdb 查询的结果怎么办
从不更新。在这种情况下,将不会进入 for cur.Next(&rec)
循环并且不会使用 closesignal
。
这个 goroutine 会保持 运行ning 吗?如果是这样,我该如何停止这个 goroutine?
getData()
函数
func getData(session *r.Session, c chan interface{}) {
var rec interface{}
changesOpts := r.ChangesOpts{
IncludeInitial: true,
}
cur, err := r.DB(DBNAME).Table("test").Changes(changesOpts).Run(session)
if err != nil {
log.Println(err)
return
}
defer cur.Close()
defer func() {
fmt.Println("exiting getData goroutine()...")
}()
for cur.Next(&rec) {
c <- rec
}
}
您可以通过关闭光标来停止正在侦听更改源的 goroutine。例如,此代码将在关闭前监听 10 秒的更改提要:
go func() {
time.Sleep(10 * time.Second)
cur.Close()
}()
for cur.Next(&rec) {
c <- rec
}
// Loop exits as the cursor has been closed
您应该通过 context 停止更改源。您可以将上下文传递给 RunOpts
。这将立即关闭您的更改源。
我认为处理这种情况的最佳方法是使用 contexts 关闭游标,使用 goroutine。 这里有一些关于上下文的good documentation from the go blog。
这是一个上下文实现的例子:
func AreThereChanges(ctx context.Context) error {
cursor, err := r.Table("something").
Changes().
Run(db.Session)
if err != nil {
//manage error
}
go func(ctx context.Context) {
<-ctx.Done()
cursor.Close()
}(ctx)
var changeFeed map[string]interface{}
for cursor.Next(&changeFeed) {
if changeFeed["old_val"] != nil {
//do something
}
if changeFeed["new_val"] != nil {
//do something
}
}