如何连续收听变化提要 RethinkDB

How to listen to change feed continuously RethinkDB

我遇到以下问题:RethinkDB 使用 RunChangesAsync 方法 运行s 一次,一旦使用,它就会开始侦听给定查询的更改。当查询更改时,您将获得 Cursor<Change<Class>> ,这是初始状态和实际状态之间的增量。

我的问题是如何让这个 运行 连续?

如果我使用:

while(true)
{
 code....   //changes happening while program is here
....../
...RunChangesAsync();
/......processed buffered items
code   //new changes here
}

如果我在代码中指出的地方发生了变化,它们将不会被 RunChanges 捕捉到。唯一会被捕获的变化是在 RunChanges 正在收听时。不在..或检索结果之后。

所以我尝试将 RunChanges 包装在 observable 中,但它并没有像我预期的那样连续监听变化......它只检索 2 个空项目(我想是垃圾)和结束。

可观察

public  IObservable<Cursor<Change<UserStatus?>>> GetObservable() =>
    r.Db(Constants.DB_NAME).Table(Constants.CLIENT_TABLE).RunChangesAsync<UserStatus?>(this.con,CancellationToken.None).ToObservable();

观察者

class PlayerSubscriber : IObserver<Cursor<Change<UserStatus?>>>
{
    public void OnCompleted() => Console.WriteLine("Finished");
    public void OnError(Exception error) => Console.WriteLine("error");
    public void OnNext(Cursor<Change<UserStatus?>> value)
    {
        foreach (var item in value.BufferedItems)
            Console.WriteLine(item);
    }
}

计划

class Program
{
    public static RethinkDB r = RethinkDB.R;
    public static bool End = false;
    static async Task Main(string[] args)
    {
        var address = new Address { Host = "127.0.0.1", Port = 28015 };
        var con = await r.Connection().Hostname(address.Host).Port(address.Port).ConnectAsync();
        var database = new Database(r, con);
        var obs  = database.GetObservable();
        var sub  = new PlayerSubscriber();
        var disp = obs.Subscribe(sub);

        Console.ReadKey();
        Console.WriteLine("Hello World!");
    }
}

如您所见,当我调试时,ObserverOnNext 方法仅执行一次(returns 两个空对象),然后关闭。

P.S:Database 只是 rethinkdb 查询的包装器。唯一使用的方法是我发布的GetObservableUserStatusPOCO.

如果您有一个带有签名 Task<int> ReadAsync() 的操作,那么设置轮询的方法是这样的:

IObservable<int> PollRead(TimeSpan interval)
{
    return
        Observable
            .Interval(interval)
            .SelectMany(n => Observable.FromAsync(() => ReadAsync()));
}

我还提醒您不要创建自己的 IObservable<T> 实现 - 这充满了危险。如果您要创建自己的观察者并希望交出,则应使用 Observer.Create(...)。通常你甚至不会那样做。

创建更改源时,您需要创建一个 更改源对象。例如,当您在 运行 宁 .RunChangesAsync(); 之后返回 Cursor<Change<T>> 时,这就是您所需要的。

您从 query.RunChangesAsync() 返回的 cursor 对象是您的更改源对象,您将在想要接收更改的整个生命周期中使用它。

在你的例子中:

while(true)
{
 code....   //changes happening while program is here
....../
...RunChangesAsync();
/......processed buffered items
code   //new changes here
}

while 循环中使用 .RunChangesAsync(); 不是正确的方法。您无需再次重新运行 查询并获得另一个Cursor<Change<T>>。我将在本 post.

的最后解释它是如何工作的

另外,不要在 cursor 对象上使用 cursor.BufferedItems。光标上的 cursor.BufferedItems 属性 并不意味着您的代码会直接使用它们; cursor.BufferedItems 属性 仅在您希望 "peek ahead" 在游标对象(客户端)内针对特定于您的更改的准备好使用的项目公开提要查询。

使用更改源中的项目的正确方法是枚举 cursor 对象本身,如下所示:

var cursor = await query.RunChangesAsync(conn);
foreach (var item in cursor){
    Console.WriteLine(item);
}

cursor 运行 没有项目时,它会向 RethinkDB 服务器请求更多项目。请记住,foreach 循环的每次迭代 都可能是一个 阻塞调用。例如,当 1) 客户端没有要消耗的项目 (.BufferedItems.Count == 0) 和 2) 根据您的更改提要查询条件,服务器端没有已更改的项目。在这种情况下,foreach 循环将阻塞,直到 RethinkDB 服务器向您发送一个可以使用的项目。

Documentation about using Reactive Extensions and RethinkDB in C#

有一个驱动程序单元测试展示了 .NET Reactive Extensions 如何工作here

具体来说,Lines 31 - 47 in this unit test 使用 Reactive Extensions 设置更改提要:

var changes = R.Db(DbName).Table(TableName)
    //.changes()[new {include_states = true, include_initial = true}]
    .Changes()
    .RunChanges<JObject>(conn);

changes.IsFeed.Should().BeTrue();

var observable = changes.ToObservable();

//use a new thread if you want to continue,
//otherwise, subscription will block.
observable.SubscribeOn(NewThreadScheduler.Default)
    .Subscribe(
        x => OnNext(x),
        e => OnError(e),
        () => OnCompleted()
    );

此外,here is a good example and explanation 发生了什么以及如何使用 C# 使用更改源:

希望对您有所帮助。

谢谢,
布莱恩