如何连续收听变化提要 RethinkDB
How to listen to change feed continuously RethinkDB
我遇到以下问题:RethinkDB
使用 RunChangesAsync
方法 运行s 一次,一旦使用,它就会开始侦听给定查询的更改。当查询更改时,您将获得 Cursor<Change<Class>>
,这是初始状态和实际状态之间的增量。
我的问题是如何让这个 运行 连续?
如果我使用:
while(true)
{
code.... //changes happening while program is here
....../
...RunChangesAsync();
/......processed buffered items
code //new changes here
}
如果我在代码中指出的地方发生了变化,它们将不会被 RunChanges
捕捉到。唯一会被捕获的变化是在 RunChanges
正在收听时。不在..或检索结果之后。
所以我尝试将 RunChanges
包装在 observable
中,但它并没有像我预期的那样连续监听变化......它只检索 2 个空项目(我想是垃圾)和结束。
可观察
public IObservable<Cursor<Change<UserStatus?>>> GetObservable() =>
r.Db(Constants.DB_NAME).Table(Constants.CLIENT_TABLE).RunChangesAsync<UserStatus?>(this.con,CancellationToken.None).ToObservable();
观察者
class PlayerSubscriber : IObserver<Cursor<Change<UserStatus?>>>
{
public void OnCompleted() => Console.WriteLine("Finished");
public void OnError(Exception error) => Console.WriteLine("error");
public void OnNext(Cursor<Change<UserStatus?>> value)
{
foreach (var item in value.BufferedItems)
Console.WriteLine(item);
}
}
计划
class Program
{
public static RethinkDB r = RethinkDB.R;
public static bool End = false;
static async Task Main(string[] args)
{
var address = new Address { Host = "127.0.0.1", Port = 28015 };
var con = await r.Connection().Hostname(address.Host).Port(address.Port).ConnectAsync();
var database = new Database(r, con);
var obs = database.GetObservable();
var sub = new PlayerSubscriber();
var disp = obs.Subscribe(sub);
Console.ReadKey();
Console.WriteLine("Hello World!");
}
}
如您所见,当我调试时,Observer
的 OnNext
方法仅执行一次(returns 两个空对象),然后关闭。
P.S:Database
只是 rethinkdb 查询的包装器。唯一使用的方法是我发布的GetObservable
。 UserStatus
是 POCO
.
如果您有一个带有签名 Task<int> ReadAsync()
的操作,那么设置轮询的方法是这样的:
IObservable<int> PollRead(TimeSpan interval)
{
return
Observable
.Interval(interval)
.SelectMany(n => Observable.FromAsync(() => ReadAsync()));
}
我还提醒您不要创建自己的 IObservable<T>
实现 - 这充满了危险。如果您要创建自己的观察者并希望交出,则应使用 Observer.Create(...)
。通常你甚至不会那样做。
创建更改源时,您需要创建一个 更改源对象。例如,当您在 运行 宁 .RunChangesAsync();
之后返回 Cursor<Change<T>>
时,这就是您所需要的。
您从 query.RunChangesAsync()
返回的 cursor
对象是您的更改源对象,您将在想要接收更改的整个生命周期中使用它。
在你的例子中:
while(true)
{
code.... //changes happening while program is here
....../
...RunChangesAsync();
/......processed buffered items
code //new changes here
}
在 while
循环中使用 .RunChangesAsync();
不是正确的方法。您无需再次重新运行 查询并获得另一个Cursor<Change<T>>
。我将在本 post.
的最后解释它是如何工作的
另外,不要在 cursor
对象上使用 cursor.BufferedItems
。光标上的 cursor.BufferedItems
属性 并不意味着您的代码会直接使用它们; cursor.BufferedItems
属性 仅在您希望 "peek ahead" 在游标对象(客户端)内针对特定于您的更改的准备好使用的项目公开提要查询。
使用更改源中的项目的正确方法是枚举 cursor
对象本身,如下所示:
var cursor = await query.RunChangesAsync(conn);
foreach (var item in cursor){
Console.WriteLine(item);
}
当 cursor
运行 没有项目时,它会向 RethinkDB 服务器请求更多项目。请记住,foreach
循环的每次迭代 都可能是一个 阻塞调用。例如,当 1) 客户端没有要消耗的项目 (.BufferedItems.Count == 0
) 和 2) 根据您的更改提要查询条件,服务器端没有已更改的项目。在这种情况下,foreach
循环将阻塞,直到 RethinkDB 服务器向您发送一个可以使用的项目。
Documentation about using Reactive Extensions and RethinkDB in C#
有一个驱动程序单元测试展示了 .NET Reactive Extensions 如何工作here。
具体来说,Lines 31 - 47 in this unit test 使用 Reactive Extensions 设置更改提要:
var changes = R.Db(DbName).Table(TableName)
//.changes()[new {include_states = true, include_initial = true}]
.Changes()
.RunChanges<JObject>(conn);
changes.IsFeed.Should().BeTrue();
var observable = changes.ToObservable();
//use a new thread if you want to continue,
//otherwise, subscription will block.
observable.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(
x => OnNext(x),
e => OnError(e),
() => OnCompleted()
);
此外,here is a good example and explanation 发生了什么以及如何使用 C# 使用更改源:
希望对您有所帮助。
谢谢,
布莱恩
我遇到以下问题:RethinkDB
使用 RunChangesAsync
方法 运行s 一次,一旦使用,它就会开始侦听给定查询的更改。当查询更改时,您将获得 Cursor<Change<Class>>
,这是初始状态和实际状态之间的增量。
我的问题是如何让这个 运行 连续?
如果我使用:
while(true)
{
code.... //changes happening while program is here
....../
...RunChangesAsync();
/......processed buffered items
code //new changes here
}
如果我在代码中指出的地方发生了变化,它们将不会被 RunChanges
捕捉到。唯一会被捕获的变化是在 RunChanges
正在收听时。不在..或检索结果之后。
所以我尝试将 RunChanges
包装在 observable
中,但它并没有像我预期的那样连续监听变化......它只检索 2 个空项目(我想是垃圾)和结束。
可观察
public IObservable<Cursor<Change<UserStatus?>>> GetObservable() =>
r.Db(Constants.DB_NAME).Table(Constants.CLIENT_TABLE).RunChangesAsync<UserStatus?>(this.con,CancellationToken.None).ToObservable();
观察者
class PlayerSubscriber : IObserver<Cursor<Change<UserStatus?>>>
{
public void OnCompleted() => Console.WriteLine("Finished");
public void OnError(Exception error) => Console.WriteLine("error");
public void OnNext(Cursor<Change<UserStatus?>> value)
{
foreach (var item in value.BufferedItems)
Console.WriteLine(item);
}
}
计划
class Program
{
public static RethinkDB r = RethinkDB.R;
public static bool End = false;
static async Task Main(string[] args)
{
var address = new Address { Host = "127.0.0.1", Port = 28015 };
var con = await r.Connection().Hostname(address.Host).Port(address.Port).ConnectAsync();
var database = new Database(r, con);
var obs = database.GetObservable();
var sub = new PlayerSubscriber();
var disp = obs.Subscribe(sub);
Console.ReadKey();
Console.WriteLine("Hello World!");
}
}
如您所见,当我调试时,Observer
的 OnNext
方法仅执行一次(returns 两个空对象),然后关闭。
P.S:Database
只是 rethinkdb 查询的包装器。唯一使用的方法是我发布的GetObservable
。 UserStatus
是 POCO
.
如果您有一个带有签名 Task<int> ReadAsync()
的操作,那么设置轮询的方法是这样的:
IObservable<int> PollRead(TimeSpan interval)
{
return
Observable
.Interval(interval)
.SelectMany(n => Observable.FromAsync(() => ReadAsync()));
}
我还提醒您不要创建自己的 IObservable<T>
实现 - 这充满了危险。如果您要创建自己的观察者并希望交出,则应使用 Observer.Create(...)
。通常你甚至不会那样做。
创建更改源时,您需要创建一个 更改源对象。例如,当您在 运行 宁 .RunChangesAsync();
之后返回 Cursor<Change<T>>
时,这就是您所需要的。
您从 query.RunChangesAsync()
返回的 cursor
对象是您的更改源对象,您将在想要接收更改的整个生命周期中使用它。
在你的例子中:
while(true)
{
code.... //changes happening while program is here
....../
...RunChangesAsync();
/......processed buffered items
code //new changes here
}
在 while
循环中使用 .RunChangesAsync();
不是正确的方法。您无需再次重新运行 查询并获得另一个Cursor<Change<T>>
。我将在本 post.
另外,不要在 cursor
对象上使用 cursor.BufferedItems
。光标上的 cursor.BufferedItems
属性 并不意味着您的代码会直接使用它们; cursor.BufferedItems
属性 仅在您希望 "peek ahead" 在游标对象(客户端)内针对特定于您的更改的准备好使用的项目公开提要查询。
使用更改源中的项目的正确方法是枚举 cursor
对象本身,如下所示:
var cursor = await query.RunChangesAsync(conn);
foreach (var item in cursor){
Console.WriteLine(item);
}
当 cursor
运行 没有项目时,它会向 RethinkDB 服务器请求更多项目。请记住,foreach
循环的每次迭代 都可能是一个 阻塞调用。例如,当 1) 客户端没有要消耗的项目 (.BufferedItems.Count == 0
) 和 2) 根据您的更改提要查询条件,服务器端没有已更改的项目。在这种情况下,foreach
循环将阻塞,直到 RethinkDB 服务器向您发送一个可以使用的项目。
Documentation about using Reactive Extensions and RethinkDB in C#
有一个驱动程序单元测试展示了 .NET Reactive Extensions 如何工作here。
具体来说,Lines 31 - 47 in this unit test 使用 Reactive Extensions 设置更改提要:
var changes = R.Db(DbName).Table(TableName)
//.changes()[new {include_states = true, include_initial = true}]
.Changes()
.RunChanges<JObject>(conn);
changes.IsFeed.Should().BeTrue();
var observable = changes.ToObservable();
//use a new thread if you want to continue,
//otherwise, subscription will block.
observable.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(
x => OnNext(x),
e => OnError(e),
() => OnCompleted()
);
此外,here is a good example and explanation 发生了什么以及如何使用 C# 使用更改源:
希望对您有所帮助。
谢谢,
布莱恩