从事件中收集值

Collecting values from events

我有一个活动。 该事件会不时触发,它会调用一个 Action<int>.

的事件处理程序

现在我想 "collect" 这些整数事件传递给我并将它们保存在列表中。我还想指定完成列表和开始新列表的时刻。

我想到的天真的解决方案是 属性 List<int> ValList。 事件处理程序在每次调用时都会添加一个值。 消费方想拿单就拿,不时说ValList = new List<int>(); 为了避免线程同步问题,我也需要一个锁。

我发现这个解决方案非常丑陋,并且想知道替代方案。 随着时间的推移,我越来越成为一名函数式程序员,而且我经常使用它。 但是当涉及到这样的问题时,我仍然在考虑程序化。 我真的很想避免使用可变列表(我仍在使用 System.Collections.Immutable)。

有没有没有可变和副作用的很好的功能解决方案?

如果我完全理解您的意思,您的活动已锁定为操作,您无法控制该活动签名。您想要收集传递的每个 int 直到外部请求来检索包含任何累积整数的列表的某个点,此时列表将重置并保存有关集合的时间信息。对吗?

让我感到困惑的是,为什么您会提到使用 OOP 语言的函数式编程?您可能会争辩说 LINQ 在某种程度上是函数式的,但对于注重函数式思维的人来说肯定有更好的选择吗?因为使用累加器管理器 class.

这似乎是一个非常简单的解决方案
namespace bs
{
struct CollectionEvent
{
    public DateTime Retrieved { get; set; }      
    public String IP { get; set; }  
}

static class Accumulator
{
    private static List<int> Items { get; set; } = new List<int>();
    private static bool Mutex { get; set; } = false;
    private static List<CollectionEvent> Collections { get; set; } = new List<CollectionEvent>();

    public static void Add(int i)
    {
        Sync(() => Items.Add(i));
    }

    public static List<int> Retrieve(String IP)
    {
        Collections.Add(new CollectionEvent
        {
            Retrieved = DateTime.UtcNow,
            IP = IP
        });

        List<int> dataOut = null;
        Sync(() =>
        {
            dataOut = new List<int>(Items);
            Items = new List<int>();
        });

        return dataOut;
    }

    public static void Sync(Action fn)
    {
        const int Threshold = 10;
        int attempts = 0;

        for (; Mutex && (attempts < Threshold); attempts++)
            Thread.Sleep(100 * attempts);

        if (attempts == Threshold)
            throw new Exception(); // or something better I'm sure

        Mutex = true;
        fn();
        Mutex = false;
    }
}

class Program
{
    static void Main(string[] args)
    {
        var r = new Random();
        var m = r.Next(5, 10);

        for (int i = 0; i < m; i++)
        {
            var datum = r.Next(100, 10000);
            Console.WriteLine($"COLLECT {datum}");
            Accumulator.Add(datum);
        }

        Console.WriteLine("RETRIEVE");
        Accumulator.Retrieve("0.0.0.0").ForEach(i => Console.WriteLine($"\t{i}"));

        m = r.Next(5, 10);
        for (int i = 0; i < m; i++)
        {
            var datum = r.Next(100, 10000); 
            Console.WriteLine($"COLLECT {datum}");
            Accumulator.Add(datum);
        }

        Console.WriteLine("RETRIEVE");
        Accumulator.Retrieve("0.0.0.0").ForEach(i => Console.WriteLine($"\t{i}"));

        Console.Read();
    }
}
}

您应该考虑为此使用 Reactive Extensions。它处理值流(事件),并且可以消除对锁的需求。

首先,我将为 AddCompleteRequestView 操作定义一些操作 classes。这类似于 F# 中的可区分联合,例如:

public class EventAction
{
    public static EventAction Add(int value) => new AddAction(value);
    public static readonly RequestViewAction RequestView = new RequestViewAction();
    public static readonly EventAction Complete = new CompleteAction();
}

public class AddAction : EventAction
{
    public readonly int Value;
    public AddAction(int value) => Value = value;
}

public class CompleteAction : EventAction
{
}

public class RequestViewAction : EventAction
{
}

接下来我要创建一个名为 AggregateView 的类型,它将包含三个 Rx Subject 值:

  • aggregator 将收集 EventAction 事件并管理聚合的 Lst<int>Lst<int>the language-ext functional language extensions library 的不可变列表类型,但您可以也使用 ImmutableList)。
  • events 这将只是一个整数事件流
  • views 这将是 Lst<int> 次观看

这是 class:

using System;
using LanguageExt;
using static LanguageExt.Prelude;
using System.Reactive.Linq;
using System.Reactive.Subjects;

public class AggregateView : IDisposable
{
    readonly Subject<EventAction> aggregator = new Subject<EventAction>();
    readonly Subject<int> events = new Subject<int>();
    readonly Subject<Lst<int>> view = new Subject<Lst<int>>();

    readonly IDisposable subscription;

    public AggregateView()
    {
        // Creates an aggregate view of the integers that responds to various control
        // actions coming through.  
        subscription = aggregator.Aggregate(
            Lst<int>.Empty,
            (list, action) =>
            {
                switch(action)
                {
                    // Adds an item to the aggregate list and passes it on to the 
                    // events Subject
                    case AddAction add:
                        events.OnNext(add.Value);
                        return list.Add(add.Value);

                    // Clears the list and passes a list onto the views Subject
                    case CompleteAction complete:
                        view.OnNext(Lst<int>.Empty);
                        return Lst<int>.Empty;

                    // Gets the current aggregate list and passes it onto the 
                    // views Subject
                    case RequestViewAction req:
                        view.OnNext(list);
                        return list;

                    default:
                        return list;
                }
            })
            .Subscribe(x => { });
    }

    /// <summary>
    /// Observable stream of integer events
    /// </summary>
    public IObservable<int> Events => 
        events;

    /// <summary>
    /// Observable stream of list views
    /// </summary>
    public IObservable<Lst<int>> Views =>
        view;

    /// <summary>
    /// Listener for plugging into an event
    /// </summary>
    public void Listener(int value) =>
        aggregator.OnNext(EventAction.Add(value));

    /// <summary>
    /// Clears the aggregate view and post it to Views
    /// </summary>
    public void Complete() =>
        aggregator.OnNext(EventAction.Complete);

    /// <summary>
    /// Requests a the current aggregate view to be pushed through to 
    /// the Views subscribers
    /// </summary>
    public void RequestView() =>
        aggregator.OnNext(EventAction.RequestView);

    /// <summary>
    /// Dispose
    /// </summary>
    public void Dispose()
    {
        subscription?.Dispose();
        view?.OnCompleted();
        events?.OnCompleted();
        view?.Dispose();
        events?.Dispose();
    }
}

它有两个 IObservable 属性:

  • Views - 允许您订阅聚合列表
  • Events - 允许您订阅整数事件

还有一些有用的方法:

  • Listener - 这就是您要插入 event
  • 的内容
  • Complete - 这将清空聚合列表并将空列表发送到 View observable
  • RequestView - 这会将当前聚合列表发送给 Views 可观察对象的所有订阅者。

终于要测试了:

class Program
{
    static event Action<int> eventTest;

    static void Main(string[] args)
    {
        var aggregate = new AggregateView();
        eventTest += aggregate.Listener;

        aggregate.Views.Subscribe(ReceiveList);
        aggregate.Events.Subscribe(ReceiveValue);

        eventTest(1);
        eventTest(2);
        eventTest(3);
        eventTest(4);
        eventTest(5);

        aggregate.RequestView();
        aggregate.Complete();

        eventTest(6);
        eventTest(7);
        eventTest(8);
        eventTest(9);
        eventTest(10);

        aggregate.RequestView();
    }

    static void ReceiveList(Lst<int> list) =>
        Console.WriteLine($"Got list of {list.Count} items: {ListShow(list)}");

    static void ReceiveValue(int x) =>
        Console.WriteLine(x);

    static string ListShow(Lst<int> list) => 
        String.Join(", ", list);
}

这是我在处理事件时能想到的最实用的方式。 Action<int> 应该永远是 危险信号 对于任何想在功能上工作的人来说,因为默认情况下它有副作用而且不是纯粹的。所以你需要尽可能地封装副作用,让其他一切都变得纯粹。

顺便说一下,您可以将这整个事情概括为适用于任何类型。这使得它更有用:

public enum EventActionTag
{
    Add,
    Complete,
    RequestView
}

public class EventAction<T>
{
    public readonly EventActionTag Tag;

    public static EventAction<T> Add(T value) => new AddAction<T>(value);
    public static readonly EventAction<T> RequestView = new RequestViewAction<T>();
    public static readonly EventAction<T> Complete = new CompleteAction<T>();

    public EventAction(EventActionTag tag) =>
        Tag = tag;
}

public class AddAction<T> : EventAction<T>
{
    public readonly T Value;
    public AddAction(T value) : base(EventActionTag.Add) =>
        Value = value;
}
public class CompleteAction<T> : EventAction<T>
{
    public CompleteAction() : base(EventActionTag.Complete)
    { }
}
public class RequestViewAction<T> : EventAction<T>
{
    public RequestViewAction() : base(EventActionTag.RequestView)
    { }
}

public class AggregateView<T> : IDisposable
{
    readonly Subject<EventAction<T>> aggregator = new Subject<EventAction<T>>();
    readonly Subject<T> events = new Subject<T>();
    readonly Subject<Lst<T>> view = new Subject<Lst<T>>();

    readonly IDisposable subscription;

    public AggregateView()
    {
        // Creates an aggregate view of the integers that responds to various control
        // actions coming through.  
        subscription = aggregator.Aggregate(
            Lst<T>.Empty,
            (list, action) =>
            {
                switch(action.Tag)
                {
                    // Adds an item to the aggregate list and passes it on to the 
                    // events Subject
                    case EventActionTag.Add:
                        var add = (AddAction<T>)action;
                        events.OnNext(add.Value);
                        return list.Add(add.Value);

                    // Clears the list and passes a list onto the views Subject
                    case EventActionTag.Complete:
                        view.OnNext(Lst<T>.Empty);
                        return Lst<T>.Empty;

                    // Gets the current aggregate list and passes it onto the 
                    // views Subject
                    case EventActionTag.RequestView:
                        view.OnNext(list);
                        return list;

                    default:
                        return list;
                }
            })
            .Subscribe(x => { });
    }

    /// <summary>
    /// Observable stream of integer events
    /// </summary>
    public IObservable<T> Events => 
        events;

    /// <summary>
    /// Observable stream of list views
    /// </summary>
    public IObservable<Lst<T>> Views =>
        view;

    /// <summary>
    /// Listener for plugging into an event
    /// </summary>
    public void Listener(T value) =>
        aggregator.OnNext(EventAction<T>.Add(value));

    /// <summary>
    /// Clears the aggregate view and post it to Views
    /// </summary>
    public void Complete() =>
        aggregator.OnNext(EventAction<T>.Complete);

    /// <summary>
    /// Requests a the current aggregate view to be pushed through to 
    /// the Views subscribers
    /// </summary>
    public void RequestView() =>
        aggregator.OnNext(EventAction<T>.RequestView);

    /// <summary>
    /// Dispose
    /// </summary>
    public void Dispose()
    {
        subscription?.Dispose();
        view?.OnCompleted();
        events?.OnCompleted();
        view?.Dispose();
        events?.Dispose();
    }
}