如何订阅 IObservable 序列、强制完成并在没有竞争条件的情况下检索所有数据
How to Subscribe to IObservable Sequence, force completion, and retrieve all data without race conditions
有一种模式在处理可观察对象时遇到了问题。
我正在使用蓝牙设备。
- 我向该设备发送一条消息,告诉它做某事并通知我结果。
- 设备开始发送通知(可能持续 10 毫秒或 20 秒)
- 我在等待设备完成发送通知。有时这将是来自设备的特定消息,有时我只是在超时期限内不会再收到任何消息。
- 我将消息转换为单个项目或 IEnumerable,然后继续我的快乐之路。
示例一:
- 我输入带有登录消息和密码的登录命令
- 设备发回成功或失败信息(通常10ms左右)
- 我等消息来[=44=]
- 我使用消息来告诉用户是否可以继续,或者他们是否需要重试密码。
示例二:
- 我向蓝牙设备发送命令,请求范围内的所有 wifi 网络
- 设备打开其 wifi 无线电并发回未知数量的消息,但在某个时刻停止
- 我等待消息停止
- 我向用户展示了完整的 wifi 网络列表
我认为这应该按照以下方式完成。 (我已经删除了尽可能多的蓝牙特定代码以帮助专注于 Rx):
//Create a new subject
Subject<int> subject = new Subject<int>();
//Observe the subject until some pre-determined stopping criteria
bool waiting = true;
IObservable<int> sequence = subject.TakeWhile(x => waiting);
//Subscribe to the subject so that I can trigger the stopping criteria
IDisposable subscription = sequence.Subscribe(
onNext: result =>
{
if (result > 50)
waiting = false;
},
onCompleted: () =>
{
return;
});
//fake bluetooth messages
int i = 0;
while (i < 100)
subject.OnNext(i++);
//Gather them all up once the sequence is complete
//***application hangs here***
List<int> ints = await sequence.ToList() as List<int>;
//This line of code is never run
subscription.Dispose();
我希望 Rx 人员可以帮助我理解为什么此 ToList() 调用挂起。我只是当场为这个问题写了这段代码,所以如果它没有意义请告诉我,我会更新它。
下面是使用第三方蓝牙库并从蓝牙设备接收项目的实际代码。
private static async Task<byte> WritePasswordToPeripheral<P>(P Peripheral, byte[] command) where P : Peripheral, IStatePeripheral
{
IGattService service = await Peripheral.RPHDevice.GetKnownService(BleService.Control);
IGattCharacteristic characteristic = await service.GetKnownCharacteristics(BleCharacteristic.PasswordResult);
//I know that this TakeWhile isn't necessary here because I'm using FirstAsync() later on
//In some similar blocks I receive multiple notifications and so I need to decide when to stop listening in this way.
//In those situations I would call .ToList() instead of .FirstAsync()
bool waiting = true;
await characteristic.EnableNotifications().TakeWhile(x=>waiting);
IObservable<CharacteristicGattResult> passwordResultSequence = characteristic
.WhenNotificationReceived();
IDisposable passwordResultSubscription = passwordResultSequence
.Subscribe(
onNext: result =>
{
waiting = false;
},
onCompleted: () =>
{
return;
});
try
{
await Peripheral.RPHDevice
.WriteCharacteristic(BleService.Control, BleCharacteristic.Password, command)
.Timeout(TimeSpan.FromSeconds(10));
}
catch (Exception)
{
return 0;
}
//In this case only one notification ever comes back and so FirstAsync would be nice
var passwordResult = await passwordResultSequence.FirstAsync();
await characteristic.DisableNotifications();
passwordResultSubscription.Dispose();
return passwordResult.Data[0];
}
收到通知时:
IObservable<CharacteristicGattResult> notifyOb;
public override IObservable<CharacteristicGattResult> WhenNotificationReceived()
{
this.AssertNotify();
this.notifyOb = this.notifyOb ?? Observable.Create<CharacteristicGattResult>(ob =>
{
var handler = new EventHandler<CBCharacteristicEventArgs>((sender, args) =>
{
if (!this.Equals(args.Characteristic))
return;
if (args.Error == null)
ob.OnNext(new CharacteristicGattResult(this, args.Characteristic.Value?.ToArray()));
else
ob.OnError(new BleException(args.Error.Description));
});
this.Peripheral.UpdatedCharacterteristicValue += handler;
return () => this.Peripheral.UpdatedCharacterteristicValue -= handler;
})
.Publish()
.RefCount();
return this.notifyOb;
}
你的代码有一堆问题。
首先,Rx 是一种异步编程模型,您正尝试对其进行同步 运行。打电话给 await sequence
(以及类似的 sequence.Wait()
)会让你几乎所有时间都感到悲伤。
接下来,您要为 sequence
observable 创建两个订阅 - 一次使用 sequence.Subscribe(...)
,一次使用 await sequence.ToList()
。它们是对底层 subject
的单独订阅,需要单独对待。
最后,您将外部状态 (bool waiting = true
) 混合到您的查询 subject.TakeWhile(x => waiting)
中。这很糟糕,因为它本质上是非线程安全的,你应该像你的查询在多个线程上 运行ning 一样编码。
您的代码发生的情况是 await sequence.ToList()
正在订阅您的查询 AFTER 您已经输出了 subject.OnNext(i++)
值,因此查询永远不会结束。在 AFTER .ToList()
触发 .TakeWhile(x => waiting)
结束 observable 之后,没有任何值会被推出主题。 .ToList()
只是坐在那里等待永远不会到来的 OnCompleted
。
您需要将 await sequence.ToList()
移动到,然后再输出值 - 您不能这样做,因为它仍然会卡住,等待永远不会出现的 OnCompleted
。
这就是您需要异步编码的原因。
现在这两个订阅也会导致您出现竞争情况。 sequence.Subscribe
可能会在 sequence.ToList()
获取任何值之前将 waiting
设置为 false
。这就是为什么您应该像查询在多个线程上 运行ning 一样进行编码。因此,为避免这种情况,您应该只订阅一次。
您需要丢掉 .TakeWhile(x => waiting)
并将条件推入,如下所示:subject.TakeWhile(x => x <= 50);
.
然后你这样写你的代码:
//Create a new subject
Subject<int> subject = new Subject<int>();
//Observe the subject until some pre-determined stopping criteria
IObservable<int> sequence = subject.TakeWhile(x => x <= 50);
sequence
.ToList()
.Subscribe(list =>
{
Console.WriteLine(String.Join(", ", list));
});
//fake bluetooth messages
int i = 0;
while (i < 100)
subject.OnNext(i++);
此代码 运行 并在控制台上生成 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50
。
不要同步编写 Rx 代码 - 丢失 await
。不要 运行 可能造成竞争条件的多个订阅。不要在您的查询中引入外部状态。
此外,使用 WhenNotificationReceived
方法,您没有正确完成序列。
您正在使用危险的 .Publish().RefCount()
运算符对,它创建了一个在完成后无法订阅的序列。
试试这个例子:
var query =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(3)
.Publish()
.RefCount();
var s1 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
var s2 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
s1.Dispose();
s2.Dispose();
var s3 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
s3.Dispose();
这只会产生:
0
1
2
2
s3
订阅没有产生任何结果。我不认为这就是你想要的。
有一种模式在处理可观察对象时遇到了问题。
我正在使用蓝牙设备。
- 我向该设备发送一条消息,告诉它做某事并通知我结果。
- 设备开始发送通知(可能持续 10 毫秒或 20 秒)
- 我在等待设备完成发送通知。有时这将是来自设备的特定消息,有时我只是在超时期限内不会再收到任何消息。
- 我将消息转换为单个项目或 IEnumerable,然后继续我的快乐之路。
示例一:
- 我输入带有登录消息和密码的登录命令
- 设备发回成功或失败信息(通常10ms左右)
- 我等消息来[=44=]
- 我使用消息来告诉用户是否可以继续,或者他们是否需要重试密码。
示例二:
- 我向蓝牙设备发送命令,请求范围内的所有 wifi 网络
- 设备打开其 wifi 无线电并发回未知数量的消息,但在某个时刻停止
- 我等待消息停止
- 我向用户展示了完整的 wifi 网络列表
我认为这应该按照以下方式完成。 (我已经删除了尽可能多的蓝牙特定代码以帮助专注于 Rx):
//Create a new subject
Subject<int> subject = new Subject<int>();
//Observe the subject until some pre-determined stopping criteria
bool waiting = true;
IObservable<int> sequence = subject.TakeWhile(x => waiting);
//Subscribe to the subject so that I can trigger the stopping criteria
IDisposable subscription = sequence.Subscribe(
onNext: result =>
{
if (result > 50)
waiting = false;
},
onCompleted: () =>
{
return;
});
//fake bluetooth messages
int i = 0;
while (i < 100)
subject.OnNext(i++);
//Gather them all up once the sequence is complete
//***application hangs here***
List<int> ints = await sequence.ToList() as List<int>;
//This line of code is never run
subscription.Dispose();
我希望 Rx 人员可以帮助我理解为什么此 ToList() 调用挂起。我只是当场为这个问题写了这段代码,所以如果它没有意义请告诉我,我会更新它。
下面是使用第三方蓝牙库并从蓝牙设备接收项目的实际代码。
private static async Task<byte> WritePasswordToPeripheral<P>(P Peripheral, byte[] command) where P : Peripheral, IStatePeripheral
{
IGattService service = await Peripheral.RPHDevice.GetKnownService(BleService.Control);
IGattCharacteristic characteristic = await service.GetKnownCharacteristics(BleCharacteristic.PasswordResult);
//I know that this TakeWhile isn't necessary here because I'm using FirstAsync() later on
//In some similar blocks I receive multiple notifications and so I need to decide when to stop listening in this way.
//In those situations I would call .ToList() instead of .FirstAsync()
bool waiting = true;
await characteristic.EnableNotifications().TakeWhile(x=>waiting);
IObservable<CharacteristicGattResult> passwordResultSequence = characteristic
.WhenNotificationReceived();
IDisposable passwordResultSubscription = passwordResultSequence
.Subscribe(
onNext: result =>
{
waiting = false;
},
onCompleted: () =>
{
return;
});
try
{
await Peripheral.RPHDevice
.WriteCharacteristic(BleService.Control, BleCharacteristic.Password, command)
.Timeout(TimeSpan.FromSeconds(10));
}
catch (Exception)
{
return 0;
}
//In this case only one notification ever comes back and so FirstAsync would be nice
var passwordResult = await passwordResultSequence.FirstAsync();
await characteristic.DisableNotifications();
passwordResultSubscription.Dispose();
return passwordResult.Data[0];
}
收到通知时:
IObservable<CharacteristicGattResult> notifyOb;
public override IObservable<CharacteristicGattResult> WhenNotificationReceived()
{
this.AssertNotify();
this.notifyOb = this.notifyOb ?? Observable.Create<CharacteristicGattResult>(ob =>
{
var handler = new EventHandler<CBCharacteristicEventArgs>((sender, args) =>
{
if (!this.Equals(args.Characteristic))
return;
if (args.Error == null)
ob.OnNext(new CharacteristicGattResult(this, args.Characteristic.Value?.ToArray()));
else
ob.OnError(new BleException(args.Error.Description));
});
this.Peripheral.UpdatedCharacterteristicValue += handler;
return () => this.Peripheral.UpdatedCharacterteristicValue -= handler;
})
.Publish()
.RefCount();
return this.notifyOb;
}
你的代码有一堆问题。
首先,Rx 是一种异步编程模型,您正尝试对其进行同步 运行。打电话给 await sequence
(以及类似的 sequence.Wait()
)会让你几乎所有时间都感到悲伤。
接下来,您要为 sequence
observable 创建两个订阅 - 一次使用 sequence.Subscribe(...)
,一次使用 await sequence.ToList()
。它们是对底层 subject
的单独订阅,需要单独对待。
最后,您将外部状态 (bool waiting = true
) 混合到您的查询 subject.TakeWhile(x => waiting)
中。这很糟糕,因为它本质上是非线程安全的,你应该像你的查询在多个线程上 运行ning 一样编码。
您的代码发生的情况是 await sequence.ToList()
正在订阅您的查询 AFTER 您已经输出了 subject.OnNext(i++)
值,因此查询永远不会结束。在 AFTER .ToList()
触发 .TakeWhile(x => waiting)
结束 observable 之后,没有任何值会被推出主题。 .ToList()
只是坐在那里等待永远不会到来的 OnCompleted
。
您需要将 await sequence.ToList()
移动到,然后再输出值 - 您不能这样做,因为它仍然会卡住,等待永远不会出现的 OnCompleted
。
这就是您需要异步编码的原因。
现在这两个订阅也会导致您出现竞争情况。 sequence.Subscribe
可能会在 sequence.ToList()
获取任何值之前将 waiting
设置为 false
。这就是为什么您应该像查询在多个线程上 运行ning 一样进行编码。因此,为避免这种情况,您应该只订阅一次。
您需要丢掉 .TakeWhile(x => waiting)
并将条件推入,如下所示:subject.TakeWhile(x => x <= 50);
.
然后你这样写你的代码:
//Create a new subject
Subject<int> subject = new Subject<int>();
//Observe the subject until some pre-determined stopping criteria
IObservable<int> sequence = subject.TakeWhile(x => x <= 50);
sequence
.ToList()
.Subscribe(list =>
{
Console.WriteLine(String.Join(", ", list));
});
//fake bluetooth messages
int i = 0;
while (i < 100)
subject.OnNext(i++);
此代码 运行 并在控制台上生成 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50
。
不要同步编写 Rx 代码 - 丢失 await
。不要 运行 可能造成竞争条件的多个订阅。不要在您的查询中引入外部状态。
此外,使用 WhenNotificationReceived
方法,您没有正确完成序列。
您正在使用危险的 .Publish().RefCount()
运算符对,它创建了一个在完成后无法订阅的序列。
试试这个例子:
var query =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(3)
.Publish()
.RefCount();
var s1 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
var s2 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
s1.Dispose();
s2.Dispose();
var s3 = query.Subscribe(Console.WriteLine);
Thread.Sleep(2500);
s3.Dispose();
这只会产生:
0 1 2 2
s3
订阅没有产生任何结果。我不认为这就是你想要的。