C# Rx DistinctUntilChanged() 用于任何结果,包括列表
C# Rx DistinctUntilChanged() for ANY result including List
我正在包装一些 API 调用来假装一个套接字结果 - 以几秒的间隔循环调用并在更改时发出结果。
使用 DistinctUntilChanged()
运算符时效果很好。但是,当结果是列表时,运算符总是发出结果,因为它与默认比较器不同。
这是我的自定义 observable,用于重复执行任务,无论成功或失败都有一定的延迟。
public class TaskRepeatObservable<T> : IObservable<T>
{
private readonly Func<Task<T>> _taskFactory;
private readonly TimeSpan _repeatDelayTimeSpan;
private readonly ILogger _logger;
private Func<Exception, bool> _onError;
public TaskRepeatObservable(Func<Task<T>> taskFactory, TimeSpan repeatDelayTimeSpan = default(TimeSpan), Func<Exception, bool> onError = null)
{
_logger = new Log4NetLogger(GetType());
_logger.IsEnabled = false;
_taskFactory = taskFactory;
_repeatDelayTimeSpan = repeatDelayTimeSpan;
_onError = onError;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var tokenSource = new CancellationTokenSource();
var cancellationToken = tokenSource.Token;
Task.Factory.StartNew(async () =>
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var result = await _taskFactory();
observer.OnNext(result);
}
catch (Exception e)
{
_logger.Error(e, "Observable Error: " + e.Message);
if (_onError != null && !_onError.Invoke(e))
throw;
}
finally
{
try
{
if (_repeatDelayTimeSpan > TimeSpan.Zero)
await Task.Delay(_repeatDelayTimeSpan, cancellationToken);
}
catch (TaskCanceledException)
{
// ignored
}
}
}
}
catch (Exception e)
{
observer.OnError(e);
}
_logger.Debug("Observable is cancelled.");
}, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
return Disposable.Create(() =>
{
tokenSource.Cancel();
});
}
}
这是包装 api 调用的扩展。
public static class ObservableBuilder
{
///<summary>
///<para>Convert Task to Observable and emit only changed result, it's useful to wrap the api as socket-like.</para>
///</summary>
public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
Func<Exception, bool> onError = null)
{
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
}
我的问题是 - 如何使 DistinctUntilChanged()
适用于任何结果,包括 List
或 Enumerable
。
注意到我已经尝试实现我自己的比较器。但我仍然不知道如何将 T
的类型检查为 select DistinctUntilChanged()
的正确比较器
public class IEnumerableComparer<T> : IEqualityComparer<IEnumerable<T>>
{
public bool Equals(IEnumerable<T> x, IEnumerable<T> y)
{
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(IEnumerable<T> obj)
{
// Will not throw an OverflowException
unchecked
{
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
这里是简单的测试代码:
ObservableBuilder.Repeat(async () =>
{
var i = new List<int>() { 1, 2, 3, 4 };
return i;
}, TimeSpan.FromSeconds(1)).ToHotObservable().Subscribe(x => Logger.Info($"Result = {x}"));
我希望列表只发出一次结果。
如果您只想拥有一个 Repeat
函数来转换所有情况,但对 IEnumerable
的工作方式不同 - 您必须使用一些反射。您需要检查类型 T
是否实现 IEnumerable<Something>
,如果是 - 使用特殊比较器 DistinctUtilChanged
- 否则使用默认比较器。
首先,稍微修改一下您的 EnumerableComparer
的签名不会有什么坏处,因为您需要特别 IEqualityComparer<T>
,而不是 IEqualityComparer<IEnumerable<T>>
:
private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
public bool Equals(T x, T y) {
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(T obj) {
// Will not throw an OverflowException
unchecked {
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
现在我们需要检查 T
是否为 IEnumerable
并通过反射创建此比较器的实例:
public static class ObservableBuilder {
public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
Func<Exception, bool> onError = null) {
var ienum = typeof(T).GetInterfaces().FirstOrDefault(c => c.IsGenericType && c.GetGenericTypeDefinition() == typeof(IEnumerable<>));
if (ienum != null) {
// implements IEnumerable - create instance of comparer and use
var comparer = (IEqualityComparer<T>) Activator.CreateInstance(typeof(EnumerableComparer<,>).MakeGenericType(typeof(T), ienum.GenericTypeArguments[0]));
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged(comparer);
}
// otherwise - don't use
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
}
private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
public bool Equals(T x, T y) {
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(T obj) {
// Will not throw an OverflowException
unchecked {
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
}
或者,您可以创建 IEqualityComparer<T>
,它将每次检查 x
和 y
是否实现 IEnumerable<Something>
并进行相应比较,但我希望它是与构建序列时仅执行一次相比,在每次比较中执行此操作的效率较低。
感谢 Evk 的回答。我没有使用反射,而是尝试实现一个更通用的比较器。但是我不知道使用这个比较器是否有任何缺点。所以我只是将它作为面临相同问题的任何人的选择。
public class DeepComparer<T> : IEqualityComparer<T>
{
public bool Equals(T x, T y)
{
// Compare the Reference
return ReferenceEquals(x, y) ||
// Using Default Comparer to comparer the value
EqualityComparer<T>.Default.Equals(x, y) ||
// If they both are list, Compare using Sequenece Equal
x is IEnumerable enumerableX &&
y is IEnumerable enumerableY &&
enumerableX.Cast<object>().SequenceEqual(enumerableY.Cast<object>());
}
public int GetHashCode(T obj)
{
unchecked
{
return obj is IEnumerable enumerable
? enumerable.Cast<object>()
.Select(e => e.GetHashCode())
.Aggregate(17, (a, b) => 23 * a + b)
: obj.GetHashCode();
}
}
}
我正在包装一些 API 调用来假装一个套接字结果 - 以几秒的间隔循环调用并在更改时发出结果。
使用 DistinctUntilChanged()
运算符时效果很好。但是,当结果是列表时,运算符总是发出结果,因为它与默认比较器不同。
这是我的自定义 observable,用于重复执行任务,无论成功或失败都有一定的延迟。
public class TaskRepeatObservable<T> : IObservable<T>
{
private readonly Func<Task<T>> _taskFactory;
private readonly TimeSpan _repeatDelayTimeSpan;
private readonly ILogger _logger;
private Func<Exception, bool> _onError;
public TaskRepeatObservable(Func<Task<T>> taskFactory, TimeSpan repeatDelayTimeSpan = default(TimeSpan), Func<Exception, bool> onError = null)
{
_logger = new Log4NetLogger(GetType());
_logger.IsEnabled = false;
_taskFactory = taskFactory;
_repeatDelayTimeSpan = repeatDelayTimeSpan;
_onError = onError;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var tokenSource = new CancellationTokenSource();
var cancellationToken = tokenSource.Token;
Task.Factory.StartNew(async () =>
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var result = await _taskFactory();
observer.OnNext(result);
}
catch (Exception e)
{
_logger.Error(e, "Observable Error: " + e.Message);
if (_onError != null && !_onError.Invoke(e))
throw;
}
finally
{
try
{
if (_repeatDelayTimeSpan > TimeSpan.Zero)
await Task.Delay(_repeatDelayTimeSpan, cancellationToken);
}
catch (TaskCanceledException)
{
// ignored
}
}
}
}
catch (Exception e)
{
observer.OnError(e);
}
_logger.Debug("Observable is cancelled.");
}, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
return Disposable.Create(() =>
{
tokenSource.Cancel();
});
}
}
这是包装 api 调用的扩展。
public static class ObservableBuilder
{
///<summary>
///<para>Convert Task to Observable and emit only changed result, it's useful to wrap the api as socket-like.</para>
///</summary>
public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
Func<Exception, bool> onError = null)
{
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
}
我的问题是 - 如何使 DistinctUntilChanged()
适用于任何结果,包括 List
或 Enumerable
。
注意到我已经尝试实现我自己的比较器。但我仍然不知道如何将 T
的类型检查为 select DistinctUntilChanged()
public class IEnumerableComparer<T> : IEqualityComparer<IEnumerable<T>>
{
public bool Equals(IEnumerable<T> x, IEnumerable<T> y)
{
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(IEnumerable<T> obj)
{
// Will not throw an OverflowException
unchecked
{
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
这里是简单的测试代码:
ObservableBuilder.Repeat(async () =>
{
var i = new List<int>() { 1, 2, 3, 4 };
return i;
}, TimeSpan.FromSeconds(1)).ToHotObservable().Subscribe(x => Logger.Info($"Result = {x}"));
我希望列表只发出一次结果。
如果您只想拥有一个 Repeat
函数来转换所有情况,但对 IEnumerable
的工作方式不同 - 您必须使用一些反射。您需要检查类型 T
是否实现 IEnumerable<Something>
,如果是 - 使用特殊比较器 DistinctUtilChanged
- 否则使用默认比较器。
首先,稍微修改一下您的 EnumerableComparer
的签名不会有什么坏处,因为您需要特别 IEqualityComparer<T>
,而不是 IEqualityComparer<IEnumerable<T>>
:
private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
public bool Equals(T x, T y) {
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(T obj) {
// Will not throw an OverflowException
unchecked {
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
现在我们需要检查 T
是否为 IEnumerable
并通过反射创建此比较器的实例:
public static class ObservableBuilder {
public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
Func<Exception, bool> onError = null) {
var ienum = typeof(T).GetInterfaces().FirstOrDefault(c => c.IsGenericType && c.GetGenericTypeDefinition() == typeof(IEnumerable<>));
if (ienum != null) {
// implements IEnumerable - create instance of comparer and use
var comparer = (IEqualityComparer<T>) Activator.CreateInstance(typeof(EnumerableComparer<,>).MakeGenericType(typeof(T), ienum.GenericTypeArguments[0]));
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged(comparer);
}
// otherwise - don't use
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
}
private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
public bool Equals(T x, T y) {
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(T obj) {
// Will not throw an OverflowException
unchecked {
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
}
或者,您可以创建 IEqualityComparer<T>
,它将每次检查 x
和 y
是否实现 IEnumerable<Something>
并进行相应比较,但我希望它是与构建序列时仅执行一次相比,在每次比较中执行此操作的效率较低。
感谢 Evk 的回答。我没有使用反射,而是尝试实现一个更通用的比较器。但是我不知道使用这个比较器是否有任何缺点。所以我只是将它作为面临相同问题的任何人的选择。
public class DeepComparer<T> : IEqualityComparer<T>
{
public bool Equals(T x, T y)
{
// Compare the Reference
return ReferenceEquals(x, y) ||
// Using Default Comparer to comparer the value
EqualityComparer<T>.Default.Equals(x, y) ||
// If they both are list, Compare using Sequenece Equal
x is IEnumerable enumerableX &&
y is IEnumerable enumerableY &&
enumerableX.Cast<object>().SequenceEqual(enumerableY.Cast<object>());
}
public int GetHashCode(T obj)
{
unchecked
{
return obj is IEnumerable enumerable
? enumerable.Cast<object>()
.Select(e => e.GetHashCode())
.Aggregate(17, (a, b) => 23 * a + b)
: obj.GetHashCode();
}
}
}