RX terminolgy:当有频繁的可观察通知时,RX operator 中的异步处理
RX terminolgy: Async processing in RX operator when there are frequent observable notifications
目的是在 RX 运算符中对稀缺资源执行一些异步工作,例如 Select。当可观察通知的速度快于完成异步操作所需的时间时,就会出现问题。
现在我真的解决了这个问题。我的问题是这种特定问题的正确术语是什么?它有名字吗?是背压吗?到目前为止,我所做的研究表明这是某种压力问题,但根据我的理解不一定是背压。我找到的最相关的资源是:
https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0)
http://reactivex.io/documentation/operators/backpressure.html
现在进入实际代码。假设有一种稀缺资源和它的消费者。在这种情况下,当资源正在使用时会抛出异常。请注意,不应更改此代码。
public class ScarceResource
{
private static bool inUse = false;
public async Task<int> AccessResource()
{
if (inUse) throw new Exception("Resource is alredy in use");
var result = await Task.Run(() =>
{
inUse = true;
Random random = new Random();
Thread.Sleep(random.Next(1, 2) * 1000);
inUse = false;
return random.Next(1, 10);
});
return result;
}
}
public class ResourceConsumer
{
public IObservable<int> DoWork()
{
var resource = new ScarceResource();
return resource.AccessResource().ToObservable();
}
}
现在这里是使用资源的简单实现的问题。抛出错误是因为通知的速度比消费者到达 运行.
的速度更快
private static void RunIntoIssue()
{
var numbers = Enumerable.Range(1, 10);
var observableSequence = numbers
.ToObservable()
.SelectMany(n =>
{
Console.WriteLine("In observable: {0}", n);
var resourceConsumer = new ResourceConsumer();
return resourceConsumer.DoWork();
});
observableSequence.Subscribe(n => Console.WriteLine("In observer: {0}", n));
}
用下面的代码问题就解决了。我通过结合使用已完成的 BehaviorSubject 和 Zip 运算符来减慢处理速度。本质上,这段代码所做的是采用顺序方法而不是并行方法。
private static void RunWithZip()
{
var completed = new BehaviorSubject<bool>(true);
var numbers = Enumerable.Range(1, 10);
var observableSequence = numbers
.ToObservable()
.Zip(completed, (n, c) =>
{
Console.WriteLine("In observable: {0}, completed: {1}", n, c);
var resourceConsumer = new ResourceConsumer();
return resourceConsumer.DoWork();
})
.Switch()
.Select(n =>
{
completed.OnNext(true);
return n;
});
observableSequence.Subscribe(n => Console.WriteLine("In observer: {0}", n));
Console.Read();
}
问题
这是背压吗?如果不是,是否有其他相关术语?
您基本上是在实施一种锁定形式或互斥体。您的代码 导致 背压,它并没有真正处理它。
想象一下,如果您的源不是生成器函数,而是一系列数据推送。数据推送以每毫秒的恒定速率到达。处理每一个都需要 10 毫秒,并且您的代码会强制进行串行处理。这会导致背压:Zip 将无限地排队未处理的数据推送,直到您 运行 内存不足。
目的是在 RX 运算符中对稀缺资源执行一些异步工作,例如 Select。当可观察通知的速度快于完成异步操作所需的时间时,就会出现问题。
现在我真的解决了这个问题。我的问题是这种特定问题的正确术语是什么?它有名字吗?是背压吗?到目前为止,我所做的研究表明这是某种压力问题,但根据我的理解不一定是背压。我找到的最相关的资源是: https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0) http://reactivex.io/documentation/operators/backpressure.html
现在进入实际代码。假设有一种稀缺资源和它的消费者。在这种情况下,当资源正在使用时会抛出异常。请注意,不应更改此代码。
public class ScarceResource
{
private static bool inUse = false;
public async Task<int> AccessResource()
{
if (inUse) throw new Exception("Resource is alredy in use");
var result = await Task.Run(() =>
{
inUse = true;
Random random = new Random();
Thread.Sleep(random.Next(1, 2) * 1000);
inUse = false;
return random.Next(1, 10);
});
return result;
}
}
public class ResourceConsumer
{
public IObservable<int> DoWork()
{
var resource = new ScarceResource();
return resource.AccessResource().ToObservable();
}
}
现在这里是使用资源的简单实现的问题。抛出错误是因为通知的速度比消费者到达 运行.
的速度更快private static void RunIntoIssue()
{
var numbers = Enumerable.Range(1, 10);
var observableSequence = numbers
.ToObservable()
.SelectMany(n =>
{
Console.WriteLine("In observable: {0}", n);
var resourceConsumer = new ResourceConsumer();
return resourceConsumer.DoWork();
});
observableSequence.Subscribe(n => Console.WriteLine("In observer: {0}", n));
}
用下面的代码问题就解决了。我通过结合使用已完成的 BehaviorSubject 和 Zip 运算符来减慢处理速度。本质上,这段代码所做的是采用顺序方法而不是并行方法。
private static void RunWithZip()
{
var completed = new BehaviorSubject<bool>(true);
var numbers = Enumerable.Range(1, 10);
var observableSequence = numbers
.ToObservable()
.Zip(completed, (n, c) =>
{
Console.WriteLine("In observable: {0}, completed: {1}", n, c);
var resourceConsumer = new ResourceConsumer();
return resourceConsumer.DoWork();
})
.Switch()
.Select(n =>
{
completed.OnNext(true);
return n;
});
observableSequence.Subscribe(n => Console.WriteLine("In observer: {0}", n));
Console.Read();
}
问题 这是背压吗?如果不是,是否有其他相关术语?
您基本上是在实施一种锁定形式或互斥体。您的代码 导致 背压,它并没有真正处理它。
想象一下,如果您的源不是生成器函数,而是一系列数据推送。数据推送以每毫秒的恒定速率到达。处理每一个都需要 10 毫秒,并且您的代码会强制进行串行处理。这会导致背压:Zip 将无限地排队未处理的数据推送,直到您 运行 内存不足。