RxUI - 为每个事务或流处理对象创建一个监听器更好吗?
RxUI - is it better to create a listener per transaction or on the stream processing object?
在过去的 3-4 个月里,我发现,默认情况下或巧合,我所有的 RxUI 可观察对象 (.Subscribe) 都是在各种对象的构造函数(代码)中创建的。但是,该应用程序还大量使用后台任务来监视各种流并采取行动(更新 UI、创建交易等),有时,这些任务需要中止某些交易。所以,我只是想知道在哪里最好 'place' 为中止指令(标志)实例化 'listener'?在创建事务对象时或在监视流的 'supervising' 对象内?如果在交易中,对绩效和处置有何影响?我知道这是一个相当广泛的问题,但我希望有人能够推荐一种合适的模式。任何指点,评论表示赞赏。
请看下面的模型代码:
using System;
using System.Threading.Tasks;
using System.Threading;
using ReactiveUI;
using UserSettingsClassLibrary;
namespace fxR.UTILITIES.MockCode
{
public class MockRxUI_AbortTransaction : ReactiveObject { } // ignore this class
public class ManageTransactions : ReactiveObject
{
// properties
CancellationTokenSource ctTransactionPosition;
// constructor
public ManageTransactions()
{
// initialise
ctTransactionPosition = new CancellationTokenSource();
}
// methods
public async Task OpenTransaction(MyStreamProcessingObject sPO, string user, string openTransactionAction)
{
// ... create transaction object, db log, etc
// open a 'listener' in case this Transaction is aborted
if (user.Contains("/"))
{
sPO.WhenAnyValue(x => x.ActiveObject.AbortTrade)
.Log(this, "TransactionOpen -> Abort trigger ")
.Subscribe(abort =>
{
if (sPO.ActiveObject.TradeOpen && abort)
{
// abort
}
});
}
}
}
// the OpenTransaction method above would be called from elsewhere in the app, eg
public class SomeOtherObject : ReactiveObject
{
ManageTransactions _manageTransactions;
MyStreamProcessingObject _sPO;
public SomeOtherObject(MyStreamProcessingObject sPO)
{
_sPO = sPO;
_manageTransactions = new ManageTransactions();
}
public async Task DoSomethingAsync()
{
await _manageTransactions.OpenTransaction(_sPO, Environment.UserName + "/" + GetType().Name, "TransactionAction").ConfigureAwait(false);
}
}
// ================================================================================================
// OR, open a 'listener' on the Stream Processing Object when it is constructed (first initialised)
// ================================================================================================
public class MyStreamProcessingObject : ReactiveObject
{
// object properties
private ActiveObject _activeObject;
public ActiveObject ActiveObject
{
get { return _activeObject; }
set { this.RaiseAndSetIfChanged(ref _activeObject, value); }
}
// constructor
public MyStreamProcessingObject()
{
// listen for Transaction abort status
this.WhenAnyValue(x => x.ActiveObject.AbortTrade)
.Log(this, "Log -> ActiveObject.AbortTransaction")
.Subscribe(async abort =>
{
if (abort)
{
await Task.Run(async () => await AbortOpenAutoTransaction().ConfigureAwait(false));
}
});
}
// methods
private async Task AbortOpenAutoTransaction()
{
var ok = false;
// await some abort code, if all ok, set ok = true;
if (ok) ActiveObject.AbortTrade = false;
}
}
}
我可以在 async/await
之前就订阅和一次性用品给你一些建议。您可以使用以下模式。
async Task<int> Foo(IObservable<bool> observable, Task<int> task){
var subscription = observable.Subscribe
( x => Console.WriteLine("WhooHoo");
using(subscription)
{
var value = await task;
return value + 10;
}
}
相当于
async Task<int> Foo(IObservable<bool> observable, Task<int> task){
var subscription = observable.Subscribe
( x => Console.WriteLine("WhooHoo");
try
{
var value = await task;
return value + 10;
}finally{
subscription.Dispose();
}
}
订阅将在任务完成后处理。
但是,如果您发现您是在构造函数中创建订阅,那么您需要另一种模式。
public class Foo : ReactiveObject, IDisposable {
private CompositeDisposable _Cleanup = new CompositeDisposable();
public Foo(IObservable<int> observable){
var subscription = observable
.Subscribe(x=>Console.WriteLine("WhooHoo"));
_Cleanup.Add(subscription);
}
public void Dispose(){
_Cleanup.Dispose();
}
}
或更好地使用 RXUI 扩展方法DisposeWith
public class Foo : ReactiveObject, IDisposable {
private CompositeDisposable _Cleanup = new CompositeDisposable();
public Foo(IObservable<int> observable){
observable
.Subscribe(x=>Console.WriteLine("WhooHoo"))
.DisposeWith(_Cleanup);
}
public void Dispose(){
_Cleanup.Dispose();
}
}
通常我创建一个基地class
public class DisposableReactiveObject : ReactiveObject, IDisposable {
protected CompositeDisposable Cleanup { get; } = new CompositeDisposable;
public void Dispose(){
_Cleanup.Dispose();
}
}
然后可以用作
public class Foo : DisposableReactiveObject {
public Foo(IObservable<int> observable){
observable
.Subscribe(x=>Console.WriteLine("WhooHoo"))
.DisposeWith(Cleanup);
}
}
在过去的 3-4 个月里,我发现,默认情况下或巧合,我所有的 RxUI 可观察对象 (.Subscribe) 都是在各种对象的构造函数(代码)中创建的。但是,该应用程序还大量使用后台任务来监视各种流并采取行动(更新 UI、创建交易等),有时,这些任务需要中止某些交易。所以,我只是想知道在哪里最好 'place' 为中止指令(标志)实例化 'listener'?在创建事务对象时或在监视流的 'supervising' 对象内?如果在交易中,对绩效和处置有何影响?我知道这是一个相当广泛的问题,但我希望有人能够推荐一种合适的模式。任何指点,评论表示赞赏。
请看下面的模型代码:
using System;
using System.Threading.Tasks;
using System.Threading;
using ReactiveUI;
using UserSettingsClassLibrary;
namespace fxR.UTILITIES.MockCode
{
public class MockRxUI_AbortTransaction : ReactiveObject { } // ignore this class
public class ManageTransactions : ReactiveObject
{
// properties
CancellationTokenSource ctTransactionPosition;
// constructor
public ManageTransactions()
{
// initialise
ctTransactionPosition = new CancellationTokenSource();
}
// methods
public async Task OpenTransaction(MyStreamProcessingObject sPO, string user, string openTransactionAction)
{
// ... create transaction object, db log, etc
// open a 'listener' in case this Transaction is aborted
if (user.Contains("/"))
{
sPO.WhenAnyValue(x => x.ActiveObject.AbortTrade)
.Log(this, "TransactionOpen -> Abort trigger ")
.Subscribe(abort =>
{
if (sPO.ActiveObject.TradeOpen && abort)
{
// abort
}
});
}
}
}
// the OpenTransaction method above would be called from elsewhere in the app, eg
public class SomeOtherObject : ReactiveObject
{
ManageTransactions _manageTransactions;
MyStreamProcessingObject _sPO;
public SomeOtherObject(MyStreamProcessingObject sPO)
{
_sPO = sPO;
_manageTransactions = new ManageTransactions();
}
public async Task DoSomethingAsync()
{
await _manageTransactions.OpenTransaction(_sPO, Environment.UserName + "/" + GetType().Name, "TransactionAction").ConfigureAwait(false);
}
}
// ================================================================================================
// OR, open a 'listener' on the Stream Processing Object when it is constructed (first initialised)
// ================================================================================================
public class MyStreamProcessingObject : ReactiveObject
{
// object properties
private ActiveObject _activeObject;
public ActiveObject ActiveObject
{
get { return _activeObject; }
set { this.RaiseAndSetIfChanged(ref _activeObject, value); }
}
// constructor
public MyStreamProcessingObject()
{
// listen for Transaction abort status
this.WhenAnyValue(x => x.ActiveObject.AbortTrade)
.Log(this, "Log -> ActiveObject.AbortTransaction")
.Subscribe(async abort =>
{
if (abort)
{
await Task.Run(async () => await AbortOpenAutoTransaction().ConfigureAwait(false));
}
});
}
// methods
private async Task AbortOpenAutoTransaction()
{
var ok = false;
// await some abort code, if all ok, set ok = true;
if (ok) ActiveObject.AbortTrade = false;
}
}
}
我可以在 async/await
之前就订阅和一次性用品给你一些建议。您可以使用以下模式。
async Task<int> Foo(IObservable<bool> observable, Task<int> task){
var subscription = observable.Subscribe
( x => Console.WriteLine("WhooHoo");
using(subscription)
{
var value = await task;
return value + 10;
}
}
相当于
async Task<int> Foo(IObservable<bool> observable, Task<int> task){
var subscription = observable.Subscribe
( x => Console.WriteLine("WhooHoo");
try
{
var value = await task;
return value + 10;
}finally{
subscription.Dispose();
}
}
订阅将在任务完成后处理。
但是,如果您发现您是在构造函数中创建订阅,那么您需要另一种模式。
public class Foo : ReactiveObject, IDisposable {
private CompositeDisposable _Cleanup = new CompositeDisposable();
public Foo(IObservable<int> observable){
var subscription = observable
.Subscribe(x=>Console.WriteLine("WhooHoo"));
_Cleanup.Add(subscription);
}
public void Dispose(){
_Cleanup.Dispose();
}
}
或更好地使用 RXUI 扩展方法DisposeWith
public class Foo : ReactiveObject, IDisposable {
private CompositeDisposable _Cleanup = new CompositeDisposable();
public Foo(IObservable<int> observable){
observable
.Subscribe(x=>Console.WriteLine("WhooHoo"))
.DisposeWith(_Cleanup);
}
public void Dispose(){
_Cleanup.Dispose();
}
}
通常我创建一个基地class
public class DisposableReactiveObject : ReactiveObject, IDisposable {
protected CompositeDisposable Cleanup { get; } = new CompositeDisposable;
public void Dispose(){
_Cleanup.Dispose();
}
}
然后可以用作
public class Foo : DisposableReactiveObject {
public Foo(IObservable<int> observable){
observable
.Subscribe(x=>Console.WriteLine("WhooHoo"))
.DisposeWith(Cleanup);
}
}