C# Rx DistinctUntilChanged() 用于任何结果,包括列表

C# Rx DistinctUntilChanged() for ANY result including List

我正在包装一些 API 调用来假装一个套接字结果 - 以几秒的间隔循环调用并在更改时发出结果。

使用 DistinctUntilChanged() 运算符时效果很好。但是,当结果是列表时,运算符总是发出结果,因为它与默认比较器不同。

这是我的自定义 observable,用于重复执行任务,无论成功或失败都有一定的延迟。

public class TaskRepeatObservable<T> : IObservable<T>
{
    private readonly Func<Task<T>> _taskFactory;
    private readonly TimeSpan _repeatDelayTimeSpan;
    private readonly ILogger _logger;
    private Func<Exception, bool> _onError;
    public TaskRepeatObservable(Func<Task<T>> taskFactory, TimeSpan repeatDelayTimeSpan = default(TimeSpan), Func<Exception, bool> onError = null)
    {
        _logger = new Log4NetLogger(GetType());
        _logger.IsEnabled = false;

        _taskFactory = taskFactory;
        _repeatDelayTimeSpan = repeatDelayTimeSpan;
        _onError = onError;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var tokenSource = new CancellationTokenSource();
        var cancellationToken = tokenSource.Token;
        Task.Factory.StartNew(async () =>
        {
            try
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        var result = await _taskFactory();
                        observer.OnNext(result);
                    }
                    catch (Exception e)
                    {
                        _logger.Error(e, "Observable Error: " + e.Message);
                        if (_onError != null && !_onError.Invoke(e))
                            throw;
                    }
                    finally
                    {
                        try
                        {
                            if (_repeatDelayTimeSpan > TimeSpan.Zero)
                                await Task.Delay(_repeatDelayTimeSpan, cancellationToken);
                        }
                        catch (TaskCanceledException)
                        {
                            // ignored
                        }
                    }
                }
            }
            catch (Exception e)
            {
                observer.OnError(e);
            }

            _logger.Debug("Observable is cancelled.");
        }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);

        return Disposable.Create(() =>
        {
            tokenSource.Cancel();
        });
    }
}

这是包装 api 调用的扩展。

public static class ObservableBuilder
{
    ///<summary>
    ///<para>Convert Task to Observable and emit only changed result, it's useful to wrap the api as socket-like.</para>
    ///</summary>
    public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
        Func<Exception, bool> onError = null)
    {
        return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
    }

我的问题是 - 如何使 DistinctUntilChanged() 适用于任何结果,包括 ListEnumerable

注意到我已经尝试实现我自己的比较器。但我仍然不知道如何将 T 的类型检查为 select DistinctUntilChanged()

的正确比较器
public class IEnumerableComparer<T> : IEqualityComparer<IEnumerable<T>>
{
    public bool Equals(IEnumerable<T> x, IEnumerable<T> y)
    {
        return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
    }

    public int GetHashCode(IEnumerable<T> obj)
    {
        // Will not throw an OverflowException
        unchecked
        {
            return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
        }
    }
}

这里是简单的测试代码:

        ObservableBuilder.Repeat(async () =>
        {
            var i = new List<int>() { 1, 2, 3, 4 };
            return i;
        }, TimeSpan.FromSeconds(1)).ToHotObservable().Subscribe(x => Logger.Info($"Result = {x}"));

我希望列表只发出一次结果。

如果您只想拥有一个 Repeat 函数来转换所有情况,但对 IEnumerable 的工作方式不同 - 您必须使用一些反射。您需要检查类型 T 是否实现 IEnumerable<Something>,如果是 - 使用特殊比较器 DistinctUtilChanged - 否则使用默认比较器。

首先,稍微修改一下您的 EnumerableComparer 的签名不会有什么坏处,因为您需要特别 IEqualityComparer<T>,而不是 IEqualityComparer<IEnumerable<T>>:

private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
    public bool Equals(T x, T y) {
        return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
    }

    public int GetHashCode(T obj) {
        // Will not throw an OverflowException
        unchecked {
            return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
        }
    }
}

现在我们需要检查 T 是否为 IEnumerable 并通过反射创建此比较器的实例:

public static class ObservableBuilder {
    public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
        Func<Exception, bool> onError = null) {
        var ienum = typeof(T).GetInterfaces().FirstOrDefault(c => c.IsGenericType && c.GetGenericTypeDefinition() == typeof(IEnumerable<>));
        if (ienum != null) {
            // implements IEnumerable - create instance of comparer and use
            var comparer = (IEqualityComparer<T>) Activator.CreateInstance(typeof(EnumerableComparer<,>).MakeGenericType(typeof(T), ienum.GenericTypeArguments[0]));
            return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged(comparer);
        }
        // otherwise - don't use
        return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
    }

    private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
        public bool Equals(T x, T y) {
            return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
        }

        public int GetHashCode(T obj) {
            // Will not throw an OverflowException
            unchecked {
                return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
            }
        }
    }
}  

或者,您可以创建 IEqualityComparer<T>,它将每次检查 xy 是否实现 IEnumerable<Something> 并进行相应比较,但我希望它是与构建序列时仅执行一次相比,在每次比较中执行此操作的效率较低。

感谢 Evk 的回答。我没有使用反射,而是尝试实现一个更通用的比较器。但是我不知道使用这个比较器是否有任何缺点。所以我只是将它作为面临相同问题的任何人的选择。

public class DeepComparer<T> : IEqualityComparer<T>
{
    public bool Equals(T x, T y)
    {
        // Compare the Reference
        return ReferenceEquals(x, y) ||
               // Using Default Comparer to comparer the value
               EqualityComparer<T>.Default.Equals(x, y) ||
               // If they both are list, Compare using Sequenece Equal
               x is IEnumerable enumerableX &&
               y is IEnumerable enumerableY &&
               enumerableX.Cast<object>().SequenceEqual(enumerableY.Cast<object>());
    }

    public int GetHashCode(T obj)
    {
        unchecked
        {
            return obj is IEnumerable enumerable
                ? enumerable.Cast<object>()
                    .Select(e => e.GetHashCode())
                    .Aggregate(17, (a, b) => 23 * a + b)
                : obj.GetHashCode();
        }
    }
}