从事件中收集值
Collecting values from events
我有一个活动。
该事件会不时触发,它会调用一个 Action<int>
.
的事件处理程序
现在我想 "collect" 这些整数事件传递给我并将它们保存在列表中。我还想指定完成列表和开始新列表的时刻。
我想到的天真的解决方案是 属性 List<int> ValList
。
事件处理程序在每次调用时都会添加一个值。
消费方想拿单就拿,不时说ValList = new List<int>();
为了避免线程同步问题,我也需要一个锁。
我发现这个解决方案非常丑陋,并且想知道替代方案。
随着时间的推移,我越来越成为一名函数式程序员,而且我经常使用它。
但是当涉及到这样的问题时,我仍然在考虑程序化。
我真的很想避免使用可变列表(我仍在使用 System.Collections.Immutable)。
有没有没有可变和副作用的很好的功能解决方案?
如果我完全理解您的意思,您的活动已锁定为操作,您无法控制该活动签名。您想要收集传递的每个 int 直到外部请求来检索包含任何累积整数的列表的某个点,此时列表将重置并保存有关集合的时间信息。对吗?
让我感到困惑的是,为什么您会提到使用 OOP 语言的函数式编程?您可能会争辩说 LINQ 在某种程度上是函数式的,但对于注重函数式思维的人来说肯定有更好的选择吗?因为使用累加器管理器 class.
这似乎是一个非常简单的解决方案
namespace bs
{
struct CollectionEvent
{
public DateTime Retrieved { get; set; }
public String IP { get; set; }
}
static class Accumulator
{
private static List<int> Items { get; set; } = new List<int>();
private static bool Mutex { get; set; } = false;
private static List<CollectionEvent> Collections { get; set; } = new List<CollectionEvent>();
public static void Add(int i)
{
Sync(() => Items.Add(i));
}
public static List<int> Retrieve(String IP)
{
Collections.Add(new CollectionEvent
{
Retrieved = DateTime.UtcNow,
IP = IP
});
List<int> dataOut = null;
Sync(() =>
{
dataOut = new List<int>(Items);
Items = new List<int>();
});
return dataOut;
}
public static void Sync(Action fn)
{
const int Threshold = 10;
int attempts = 0;
for (; Mutex && (attempts < Threshold); attempts++)
Thread.Sleep(100 * attempts);
if (attempts == Threshold)
throw new Exception(); // or something better I'm sure
Mutex = true;
fn();
Mutex = false;
}
}
class Program
{
static void Main(string[] args)
{
var r = new Random();
var m = r.Next(5, 10);
for (int i = 0; i < m; i++)
{
var datum = r.Next(100, 10000);
Console.WriteLine($"COLLECT {datum}");
Accumulator.Add(datum);
}
Console.WriteLine("RETRIEVE");
Accumulator.Retrieve("0.0.0.0").ForEach(i => Console.WriteLine($"\t{i}"));
m = r.Next(5, 10);
for (int i = 0; i < m; i++)
{
var datum = r.Next(100, 10000);
Console.WriteLine($"COLLECT {datum}");
Accumulator.Add(datum);
}
Console.WriteLine("RETRIEVE");
Accumulator.Retrieve("0.0.0.0").ForEach(i => Console.WriteLine($"\t{i}"));
Console.Read();
}
}
}
您应该考虑为此使用 Reactive Extensions。它处理值流(事件),并且可以消除对锁的需求。
首先,我将为 Add
、Complete
和 RequestView
操作定义一些操作 classes。这类似于 F# 中的可区分联合,例如:
public class EventAction
{
public static EventAction Add(int value) => new AddAction(value);
public static readonly RequestViewAction RequestView = new RequestViewAction();
public static readonly EventAction Complete = new CompleteAction();
}
public class AddAction : EventAction
{
public readonly int Value;
public AddAction(int value) => Value = value;
}
public class CompleteAction : EventAction
{
}
public class RequestViewAction : EventAction
{
}
接下来我要创建一个名为 AggregateView
的类型,它将包含三个 Rx Subject
值:
aggregator
将收集 EventAction
事件并管理聚合的 Lst<int>
(Lst<int>
是 the language-ext functional language extensions library 的不可变列表类型,但您可以也使用 ImmutableList
)。
events
这将只是一个整数事件流
views
这将是 Lst<int>
次观看
这是 class:
using System;
using LanguageExt;
using static LanguageExt.Prelude;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public class AggregateView : IDisposable
{
readonly Subject<EventAction> aggregator = new Subject<EventAction>();
readonly Subject<int> events = new Subject<int>();
readonly Subject<Lst<int>> view = new Subject<Lst<int>>();
readonly IDisposable subscription;
public AggregateView()
{
// Creates an aggregate view of the integers that responds to various control
// actions coming through.
subscription = aggregator.Aggregate(
Lst<int>.Empty,
(list, action) =>
{
switch(action)
{
// Adds an item to the aggregate list and passes it on to the
// events Subject
case AddAction add:
events.OnNext(add.Value);
return list.Add(add.Value);
// Clears the list and passes a list onto the views Subject
case CompleteAction complete:
view.OnNext(Lst<int>.Empty);
return Lst<int>.Empty;
// Gets the current aggregate list and passes it onto the
// views Subject
case RequestViewAction req:
view.OnNext(list);
return list;
default:
return list;
}
})
.Subscribe(x => { });
}
/// <summary>
/// Observable stream of integer events
/// </summary>
public IObservable<int> Events =>
events;
/// <summary>
/// Observable stream of list views
/// </summary>
public IObservable<Lst<int>> Views =>
view;
/// <summary>
/// Listener for plugging into an event
/// </summary>
public void Listener(int value) =>
aggregator.OnNext(EventAction.Add(value));
/// <summary>
/// Clears the aggregate view and post it to Views
/// </summary>
public void Complete() =>
aggregator.OnNext(EventAction.Complete);
/// <summary>
/// Requests a the current aggregate view to be pushed through to
/// the Views subscribers
/// </summary>
public void RequestView() =>
aggregator.OnNext(EventAction.RequestView);
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
subscription?.Dispose();
view?.OnCompleted();
events?.OnCompleted();
view?.Dispose();
events?.Dispose();
}
}
它有两个 IObservable
属性:
Views
- 允许您订阅聚合列表
Events
- 允许您订阅整数事件
还有一些有用的方法:
Listener
- 这就是您要插入 event
的内容
Complete
- 这将清空聚合列表并将空列表发送到 View
observable
RequestView
- 这会将当前聚合列表发送给 Views
可观察对象的所有订阅者。
终于要测试了:
class Program
{
static event Action<int> eventTest;
static void Main(string[] args)
{
var aggregate = new AggregateView();
eventTest += aggregate.Listener;
aggregate.Views.Subscribe(ReceiveList);
aggregate.Events.Subscribe(ReceiveValue);
eventTest(1);
eventTest(2);
eventTest(3);
eventTest(4);
eventTest(5);
aggregate.RequestView();
aggregate.Complete();
eventTest(6);
eventTest(7);
eventTest(8);
eventTest(9);
eventTest(10);
aggregate.RequestView();
}
static void ReceiveList(Lst<int> list) =>
Console.WriteLine($"Got list of {list.Count} items: {ListShow(list)}");
static void ReceiveValue(int x) =>
Console.WriteLine(x);
static string ListShow(Lst<int> list) =>
String.Join(", ", list);
}
这是我在处理事件时能想到的最实用的方式。 Action<int>
应该永远是 危险信号 对于任何想在功能上工作的人来说,因为默认情况下它有副作用而且不是纯粹的。所以你需要尽可能地封装副作用,让其他一切都变得纯粹。
顺便说一下,您可以将这整个事情概括为适用于任何类型。这使得它更有用:
public enum EventActionTag
{
Add,
Complete,
RequestView
}
public class EventAction<T>
{
public readonly EventActionTag Tag;
public static EventAction<T> Add(T value) => new AddAction<T>(value);
public static readonly EventAction<T> RequestView = new RequestViewAction<T>();
public static readonly EventAction<T> Complete = new CompleteAction<T>();
public EventAction(EventActionTag tag) =>
Tag = tag;
}
public class AddAction<T> : EventAction<T>
{
public readonly T Value;
public AddAction(T value) : base(EventActionTag.Add) =>
Value = value;
}
public class CompleteAction<T> : EventAction<T>
{
public CompleteAction() : base(EventActionTag.Complete)
{ }
}
public class RequestViewAction<T> : EventAction<T>
{
public RequestViewAction() : base(EventActionTag.RequestView)
{ }
}
public class AggregateView<T> : IDisposable
{
readonly Subject<EventAction<T>> aggregator = new Subject<EventAction<T>>();
readonly Subject<T> events = new Subject<T>();
readonly Subject<Lst<T>> view = new Subject<Lst<T>>();
readonly IDisposable subscription;
public AggregateView()
{
// Creates an aggregate view of the integers that responds to various control
// actions coming through.
subscription = aggregator.Aggregate(
Lst<T>.Empty,
(list, action) =>
{
switch(action.Tag)
{
// Adds an item to the aggregate list and passes it on to the
// events Subject
case EventActionTag.Add:
var add = (AddAction<T>)action;
events.OnNext(add.Value);
return list.Add(add.Value);
// Clears the list and passes a list onto the views Subject
case EventActionTag.Complete:
view.OnNext(Lst<T>.Empty);
return Lst<T>.Empty;
// Gets the current aggregate list and passes it onto the
// views Subject
case EventActionTag.RequestView:
view.OnNext(list);
return list;
default:
return list;
}
})
.Subscribe(x => { });
}
/// <summary>
/// Observable stream of integer events
/// </summary>
public IObservable<T> Events =>
events;
/// <summary>
/// Observable stream of list views
/// </summary>
public IObservable<Lst<T>> Views =>
view;
/// <summary>
/// Listener for plugging into an event
/// </summary>
public void Listener(T value) =>
aggregator.OnNext(EventAction<T>.Add(value));
/// <summary>
/// Clears the aggregate view and post it to Views
/// </summary>
public void Complete() =>
aggregator.OnNext(EventAction<T>.Complete);
/// <summary>
/// Requests a the current aggregate view to be pushed through to
/// the Views subscribers
/// </summary>
public void RequestView() =>
aggregator.OnNext(EventAction<T>.RequestView);
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
subscription?.Dispose();
view?.OnCompleted();
events?.OnCompleted();
view?.Dispose();
events?.Dispose();
}
}
我有一个活动。
该事件会不时触发,它会调用一个 Action<int>
.
现在我想 "collect" 这些整数事件传递给我并将它们保存在列表中。我还想指定完成列表和开始新列表的时刻。
我想到的天真的解决方案是 属性 List<int> ValList
。
事件处理程序在每次调用时都会添加一个值。
消费方想拿单就拿,不时说ValList = new List<int>();
为了避免线程同步问题,我也需要一个锁。
我发现这个解决方案非常丑陋,并且想知道替代方案。 随着时间的推移,我越来越成为一名函数式程序员,而且我经常使用它。 但是当涉及到这样的问题时,我仍然在考虑程序化。 我真的很想避免使用可变列表(我仍在使用 System.Collections.Immutable)。
有没有没有可变和副作用的很好的功能解决方案?
如果我完全理解您的意思,您的活动已锁定为操作,您无法控制该活动签名。您想要收集传递的每个 int 直到外部请求来检索包含任何累积整数的列表的某个点,此时列表将重置并保存有关集合的时间信息。对吗?
让我感到困惑的是,为什么您会提到使用 OOP 语言的函数式编程?您可能会争辩说 LINQ 在某种程度上是函数式的,但对于注重函数式思维的人来说肯定有更好的选择吗?因为使用累加器管理器 class.
这似乎是一个非常简单的解决方案namespace bs
{
struct CollectionEvent
{
public DateTime Retrieved { get; set; }
public String IP { get; set; }
}
static class Accumulator
{
private static List<int> Items { get; set; } = new List<int>();
private static bool Mutex { get; set; } = false;
private static List<CollectionEvent> Collections { get; set; } = new List<CollectionEvent>();
public static void Add(int i)
{
Sync(() => Items.Add(i));
}
public static List<int> Retrieve(String IP)
{
Collections.Add(new CollectionEvent
{
Retrieved = DateTime.UtcNow,
IP = IP
});
List<int> dataOut = null;
Sync(() =>
{
dataOut = new List<int>(Items);
Items = new List<int>();
});
return dataOut;
}
public static void Sync(Action fn)
{
const int Threshold = 10;
int attempts = 0;
for (; Mutex && (attempts < Threshold); attempts++)
Thread.Sleep(100 * attempts);
if (attempts == Threshold)
throw new Exception(); // or something better I'm sure
Mutex = true;
fn();
Mutex = false;
}
}
class Program
{
static void Main(string[] args)
{
var r = new Random();
var m = r.Next(5, 10);
for (int i = 0; i < m; i++)
{
var datum = r.Next(100, 10000);
Console.WriteLine($"COLLECT {datum}");
Accumulator.Add(datum);
}
Console.WriteLine("RETRIEVE");
Accumulator.Retrieve("0.0.0.0").ForEach(i => Console.WriteLine($"\t{i}"));
m = r.Next(5, 10);
for (int i = 0; i < m; i++)
{
var datum = r.Next(100, 10000);
Console.WriteLine($"COLLECT {datum}");
Accumulator.Add(datum);
}
Console.WriteLine("RETRIEVE");
Accumulator.Retrieve("0.0.0.0").ForEach(i => Console.WriteLine($"\t{i}"));
Console.Read();
}
}
}
您应该考虑为此使用 Reactive Extensions。它处理值流(事件),并且可以消除对锁的需求。
首先,我将为 Add
、Complete
和 RequestView
操作定义一些操作 classes。这类似于 F# 中的可区分联合,例如:
public class EventAction
{
public static EventAction Add(int value) => new AddAction(value);
public static readonly RequestViewAction RequestView = new RequestViewAction();
public static readonly EventAction Complete = new CompleteAction();
}
public class AddAction : EventAction
{
public readonly int Value;
public AddAction(int value) => Value = value;
}
public class CompleteAction : EventAction
{
}
public class RequestViewAction : EventAction
{
}
接下来我要创建一个名为 AggregateView
的类型,它将包含三个 Rx Subject
值:
aggregator
将收集EventAction
事件并管理聚合的Lst<int>
(Lst<int>
是 the language-ext functional language extensions library 的不可变列表类型,但您可以也使用ImmutableList
)。events
这将只是一个整数事件流views
这将是Lst<int>
次观看
这是 class:
using System;
using LanguageExt;
using static LanguageExt.Prelude;
using System.Reactive.Linq;
using System.Reactive.Subjects;
public class AggregateView : IDisposable
{
readonly Subject<EventAction> aggregator = new Subject<EventAction>();
readonly Subject<int> events = new Subject<int>();
readonly Subject<Lst<int>> view = new Subject<Lst<int>>();
readonly IDisposable subscription;
public AggregateView()
{
// Creates an aggregate view of the integers that responds to various control
// actions coming through.
subscription = aggregator.Aggregate(
Lst<int>.Empty,
(list, action) =>
{
switch(action)
{
// Adds an item to the aggregate list and passes it on to the
// events Subject
case AddAction add:
events.OnNext(add.Value);
return list.Add(add.Value);
// Clears the list and passes a list onto the views Subject
case CompleteAction complete:
view.OnNext(Lst<int>.Empty);
return Lst<int>.Empty;
// Gets the current aggregate list and passes it onto the
// views Subject
case RequestViewAction req:
view.OnNext(list);
return list;
default:
return list;
}
})
.Subscribe(x => { });
}
/// <summary>
/// Observable stream of integer events
/// </summary>
public IObservable<int> Events =>
events;
/// <summary>
/// Observable stream of list views
/// </summary>
public IObservable<Lst<int>> Views =>
view;
/// <summary>
/// Listener for plugging into an event
/// </summary>
public void Listener(int value) =>
aggregator.OnNext(EventAction.Add(value));
/// <summary>
/// Clears the aggregate view and post it to Views
/// </summary>
public void Complete() =>
aggregator.OnNext(EventAction.Complete);
/// <summary>
/// Requests a the current aggregate view to be pushed through to
/// the Views subscribers
/// </summary>
public void RequestView() =>
aggregator.OnNext(EventAction.RequestView);
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
subscription?.Dispose();
view?.OnCompleted();
events?.OnCompleted();
view?.Dispose();
events?.Dispose();
}
}
它有两个 IObservable
属性:
Views
- 允许您订阅聚合列表Events
- 允许您订阅整数事件
还有一些有用的方法:
Listener
- 这就是您要插入event
的内容
Complete
- 这将清空聚合列表并将空列表发送到View
observableRequestView
- 这会将当前聚合列表发送给Views
可观察对象的所有订阅者。
终于要测试了:
class Program
{
static event Action<int> eventTest;
static void Main(string[] args)
{
var aggregate = new AggregateView();
eventTest += aggregate.Listener;
aggregate.Views.Subscribe(ReceiveList);
aggregate.Events.Subscribe(ReceiveValue);
eventTest(1);
eventTest(2);
eventTest(3);
eventTest(4);
eventTest(5);
aggregate.RequestView();
aggregate.Complete();
eventTest(6);
eventTest(7);
eventTest(8);
eventTest(9);
eventTest(10);
aggregate.RequestView();
}
static void ReceiveList(Lst<int> list) =>
Console.WriteLine($"Got list of {list.Count} items: {ListShow(list)}");
static void ReceiveValue(int x) =>
Console.WriteLine(x);
static string ListShow(Lst<int> list) =>
String.Join(", ", list);
}
这是我在处理事件时能想到的最实用的方式。 Action<int>
应该永远是 危险信号 对于任何想在功能上工作的人来说,因为默认情况下它有副作用而且不是纯粹的。所以你需要尽可能地封装副作用,让其他一切都变得纯粹。
顺便说一下,您可以将这整个事情概括为适用于任何类型。这使得它更有用:
public enum EventActionTag
{
Add,
Complete,
RequestView
}
public class EventAction<T>
{
public readonly EventActionTag Tag;
public static EventAction<T> Add(T value) => new AddAction<T>(value);
public static readonly EventAction<T> RequestView = new RequestViewAction<T>();
public static readonly EventAction<T> Complete = new CompleteAction<T>();
public EventAction(EventActionTag tag) =>
Tag = tag;
}
public class AddAction<T> : EventAction<T>
{
public readonly T Value;
public AddAction(T value) : base(EventActionTag.Add) =>
Value = value;
}
public class CompleteAction<T> : EventAction<T>
{
public CompleteAction() : base(EventActionTag.Complete)
{ }
}
public class RequestViewAction<T> : EventAction<T>
{
public RequestViewAction() : base(EventActionTag.RequestView)
{ }
}
public class AggregateView<T> : IDisposable
{
readonly Subject<EventAction<T>> aggregator = new Subject<EventAction<T>>();
readonly Subject<T> events = new Subject<T>();
readonly Subject<Lst<T>> view = new Subject<Lst<T>>();
readonly IDisposable subscription;
public AggregateView()
{
// Creates an aggregate view of the integers that responds to various control
// actions coming through.
subscription = aggregator.Aggregate(
Lst<T>.Empty,
(list, action) =>
{
switch(action.Tag)
{
// Adds an item to the aggregate list and passes it on to the
// events Subject
case EventActionTag.Add:
var add = (AddAction<T>)action;
events.OnNext(add.Value);
return list.Add(add.Value);
// Clears the list and passes a list onto the views Subject
case EventActionTag.Complete:
view.OnNext(Lst<T>.Empty);
return Lst<T>.Empty;
// Gets the current aggregate list and passes it onto the
// views Subject
case EventActionTag.RequestView:
view.OnNext(list);
return list;
default:
return list;
}
})
.Subscribe(x => { });
}
/// <summary>
/// Observable stream of integer events
/// </summary>
public IObservable<T> Events =>
events;
/// <summary>
/// Observable stream of list views
/// </summary>
public IObservable<Lst<T>> Views =>
view;
/// <summary>
/// Listener for plugging into an event
/// </summary>
public void Listener(T value) =>
aggregator.OnNext(EventAction<T>.Add(value));
/// <summary>
/// Clears the aggregate view and post it to Views
/// </summary>
public void Complete() =>
aggregator.OnNext(EventAction<T>.Complete);
/// <summary>
/// Requests a the current aggregate view to be pushed through to
/// the Views subscribers
/// </summary>
public void RequestView() =>
aggregator.OnNext(EventAction<T>.RequestView);
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
subscription?.Dispose();
view?.OnCompleted();
events?.OnCompleted();
view?.Dispose();
events?.Dispose();
}
}