pq.NewListener/Listen 带有 pq 的块:"listen" 处或附近的语法错误

pq.NewListener/Listen blocks with pq: syntax error at or near "listen"

我有 minio/s3 对象存储,其中包含 lambda 通知到 cockroachdb(postgres 数据库)。我正在尝试使用以下 golang 代码监视这些事件。

package main

import (
    "database/sql"
    "encoding/json"
    "fmt"
    "github.com/lib/pq"
    "time"
)

const (
    //crdbConnectStr = "dbname=alerts user=crdbuser1 host=localhost port=26257 sslmode=disable connect_timeout=5"
    crdbConnectStr = "postgres://crdbuser1@localhost:26257/alerts?sslmode=disable"
    dbDriver       = "postgres"
)

func monitorEvents() {

    _, err := sql.Open(dbDriver, crdbConnectStr)
    if err != nil {
        fmt.Printf("connection open to crdb failed - %v\n", err.Error())
    }

    fmt.Printf("sql open on crdb OK\n")

    reportProblem := func(ev pq.ListenerEventType, err error) {
        if err != nil {
            fmt.Printf("NewListener - event : %v, err - %v\n", ev, err.Error())
        }
    }

    minReconnect := 2 * time.Second
    maxReconnect := 20 * time.Second
    listener := pq.NewListener(crdbConnectStr, minReconnect, maxReconnect, reportProblem)
    err = listener.Listen("monitor")
    if err != nil {
        fmt.Printf("Listen error - %v\n", err.Error())
        return
    }

    fmt.Printf("begin monitoring events in CRDB\n")

    for {
        waitForAlertEvents(listener)
    }
}

// Record holds json data from object.
type Record struct {
    Data struct {
        Value struct {
            Records []struct {
                S3 struct {
                    Bucket struct {
                        Name string `json:"name"`
                    } `json:"bucket"`
                    Object struct {
                        Key string `json:"key"`
                    } `json:"object"`
                } `json:"s3"`
            } `json:"Records"`
        } `json:"value"`
    } `json:"data"`
}

func waitForAlertEvents(l *pq.Listener) {

    for {
        select {
        case n := <-l.Notify:
            fmt.Printf("Received data from channel [%v]\n", n.Channel)
            // Prepare notification payload for pretty print
            fmt.Println(n.Extra)
            record := Record{}

            jerr := json.Unmarshal([]byte(n.Extra), &record)
            if jerr != nil {
                fmt.Println("Error processing JSON: ", jerr)
                return
            }

            bucket := record.Data.Value.Records[0].S3.Bucket.Name
            object := record.Data.Value.Records[0].S3.Object.Key
            fmt.Printf("received event on bucket: %v, object: %v\n", bucket, object)

            return

        case <-time.After(60 * time.Second):
            fmt.Println("Received no events for 90 seconds, checking connection")
            go func() {
                l.Ping()
            }()
            return
        }
    }
}

func main() {
    monitorAlerts()
}

当我 运行 这个程序时,我看到以下错误并且卡住了。

[root]# ./alerts 
sql open on crdb OK
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"
NewListener - event : 3, err - pq: syntax error at or near "listen"

手动连接到 cockroachdb 正常。

[root]# cockroach sql --insecure --user=crdbuser1

crdbuser1@:26257/defaultdb> show databases;                                                                                                               database_name  
+---------------+
  alerts         
(1 row)

Time: 1.22359ms

crdbuser1@:26257/defaultdb> set database=alerts;
SET

Time: 363.994µs

crdbuser1@:26257/alerts> show tables;
  table_name  
+------------+
  alertstab   
(1 row)

Time: 1.399014ms

crdbuser1@:26257/alerts> 

关于错误原因的任何想法 pq: syntax error at or near "listen"。我也在看 pq 来源,错误很可能与 notify.go#L756

有关

该错误表明 CockroachDB 不支持 LISTEN and NOTIFY 语句。

您将需要找到一种不同的方法来执行此操作。 CRDB 中最接近的是 Change Data Capture,但它更多地是关于数据流而不是自定义通知。

您可以在 this issue 中找到关于 CRDB 的 LISTEN / NOTIFY 的一些讨论,但目前还没有确定的计划。