如何在 rx.net 中实现我自己的运算符
How to implement my own operator in rx.net
我需要 RX 中的滞后滤波器功能。仅当先前发出的值与当前输入值相差一定量时,它才应从源流发出值。作为通用扩展方法,它可以具有以下签名:
public static IObservable<T> HysteresisFilter<T>(this IObservable<t> source, Func<T/*previously emitted*/, T/*current*/, bool> filter)
我无法弄清楚如何使用现有的运算符来实现它。我一直在寻找 lift from RxJava, any other method to create my own operator. I have seen this checklist 之类的东西,但我没有在网上找到任何示例。
以下方法(两者实际上相同)对我来说似乎是解决方法,但是否有更多 Rx 方式 来做到这一点,比如不包装 subject
或者实际实现一个运算符?
async Task Main()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var rnd = new Random();
var s = Observable.Interval(TimeSpan.FromMilliseconds(10))
.Scan(0d, (a,_) => a + rnd.NextDouble() - 0.5)
.Publish()
.AutoConnect()
;
s.Subscribe(Console.WriteLine, cts.Token);
s.HysteresisFilter((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"1> {x}"), cts.Token);
s.HysteresisFilter2((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"2> {x}"), cts.Token);
await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
}
public static class ReactiveOperators
{
public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> filter)
{
return new InternalHysteresisFilter<T>(source, filter).AsObservable;
}
public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> filter)
{
var subject = new Subject<T>();
T lastEmitted = default;
bool emitted = false;
source.Subscribe(
value =>
{
if (!emitted || filter(lastEmitted, value))
{
subject.OnNext(value);
lastEmitted = value;
emitted = true;
}
}
, ex => subject.OnError(ex)
, () => subject.OnCompleted()
);
return subject;
}
private class InternalHysteresisFilter<T>: IObserver<T>
{
Func<T, T, bool> filter;
T lastEmitted;
bool emitted;
private readonly Subject<T> subject = new Subject<T>();
public IObservable<T> AsObservable => subject;
public InternalHysteresisFilter(IObservable<T> source, Func<T, T, bool> filter)
{
this.filter = filter;
source.Subscribe(this);
}
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
public void OnNext(T value)
{
if (!emitted || filter(lastEmitted, value))
{
subject.OnNext(value);
lastEmitted = value;
emitted = true;
}
}
public void OnError(Exception error)
{
subject.OnError(error);
}
public void OnCompleted()
{
subject.OnCompleted();
}
}
}
旁注:将有数千个此类过滤器应用于尽可能多的流。我需要吞吐量而不是延迟,因此我正在寻找 CPU 和内存中开销最小的解决方案,即使其他人看起来更漂亮。
我在 Introduction to Rx 书中看到的大多数示例都使用方法 Observable.Create
创建新的运算符。
The Create factory method is the preferred way to implement custom observable sequences. The usage of subjects should largely remain in the realms of samples and testing. (citation)
public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source,
Func<T, T, bool> predicate)
{
return Observable.Create<T>(observer =>
{
T lastEmitted = default;
bool emitted = false;
return source.Subscribe(value =>
{
if (!emitted || predicate(lastEmitted, value))
{
observer.OnNext(value);
lastEmitted = value;
emitted = true;
}
}, observer.OnError, observer.OnCompleted);
});
}
这个答案与@Theodor 的相同,但它避免使用 Observable.Create
,我通常会避免使用。
public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source,
Func<T, T, bool> predicate)
{
return source
.Scan((emitted: default(T), isFirstItem: true, emit: false), (state, newItem) => state.isFirstItem || predicate(state.emitted, newItem)
? (newItem, false, true)
: (state.emitted, false, false)
)
.Where(t => t.emit)
.Select(t => t.emitted);
}
.Scan
是您在可观察对象中跨项目跟踪状态时想要使用的。
我需要 RX 中的滞后滤波器功能。仅当先前发出的值与当前输入值相差一定量时,它才应从源流发出值。作为通用扩展方法,它可以具有以下签名:
public static IObservable<T> HysteresisFilter<T>(this IObservable<t> source, Func<T/*previously emitted*/, T/*current*/, bool> filter)
我无法弄清楚如何使用现有的运算符来实现它。我一直在寻找 lift from RxJava, any other method to create my own operator. I have seen this checklist 之类的东西,但我没有在网上找到任何示例。
以下方法(两者实际上相同)对我来说似乎是解决方法,但是否有更多 Rx 方式 来做到这一点,比如不包装 subject
或者实际实现一个运算符?
async Task Main()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var rnd = new Random();
var s = Observable.Interval(TimeSpan.FromMilliseconds(10))
.Scan(0d, (a,_) => a + rnd.NextDouble() - 0.5)
.Publish()
.AutoConnect()
;
s.Subscribe(Console.WriteLine, cts.Token);
s.HysteresisFilter((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"1> {x}"), cts.Token);
s.HysteresisFilter2((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"2> {x}"), cts.Token);
await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
}
public static class ReactiveOperators
{
public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> filter)
{
return new InternalHysteresisFilter<T>(source, filter).AsObservable;
}
public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> filter)
{
var subject = new Subject<T>();
T lastEmitted = default;
bool emitted = false;
source.Subscribe(
value =>
{
if (!emitted || filter(lastEmitted, value))
{
subject.OnNext(value);
lastEmitted = value;
emitted = true;
}
}
, ex => subject.OnError(ex)
, () => subject.OnCompleted()
);
return subject;
}
private class InternalHysteresisFilter<T>: IObserver<T>
{
Func<T, T, bool> filter;
T lastEmitted;
bool emitted;
private readonly Subject<T> subject = new Subject<T>();
public IObservable<T> AsObservable => subject;
public InternalHysteresisFilter(IObservable<T> source, Func<T, T, bool> filter)
{
this.filter = filter;
source.Subscribe(this);
}
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
public void OnNext(T value)
{
if (!emitted || filter(lastEmitted, value))
{
subject.OnNext(value);
lastEmitted = value;
emitted = true;
}
}
public void OnError(Exception error)
{
subject.OnError(error);
}
public void OnCompleted()
{
subject.OnCompleted();
}
}
}
旁注:将有数千个此类过滤器应用于尽可能多的流。我需要吞吐量而不是延迟,因此我正在寻找 CPU 和内存中开销最小的解决方案,即使其他人看起来更漂亮。
我在 Introduction to Rx 书中看到的大多数示例都使用方法 Observable.Create
创建新的运算符。
The Create factory method is the preferred way to implement custom observable sequences. The usage of subjects should largely remain in the realms of samples and testing. (citation)
public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source,
Func<T, T, bool> predicate)
{
return Observable.Create<T>(observer =>
{
T lastEmitted = default;
bool emitted = false;
return source.Subscribe(value =>
{
if (!emitted || predicate(lastEmitted, value))
{
observer.OnNext(value);
lastEmitted = value;
emitted = true;
}
}, observer.OnError, observer.OnCompleted);
});
}
这个答案与@Theodor 的相同,但它避免使用 Observable.Create
,我通常会避免使用。
public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source,
Func<T, T, bool> predicate)
{
return source
.Scan((emitted: default(T), isFirstItem: true, emit: false), (state, newItem) => state.isFirstItem || predicate(state.emitted, newItem)
? (newItem, false, true)
: (state.emitted, false, false)
)
.Where(t => t.emit)
.Select(t => t.emitted);
}
.Scan
是您在可观察对象中跨项目跟踪状态时想要使用的。