构建基于功能的插件架构的模式

Pattern for building a functional based plugin architecture

我正在研究如何为项目开发插件框架,而 Rx 似乎很适合我想要实现的目标。最终,该项目将是一组插件(模块化功能),可以通过 xml 配置它们来做不同的事情。要求如下

  1. 即使在插件中也强制执行模块化架构。这鼓励松散耦合并可能最大限度地降低复杂性。这有望使单个插件功能更易于建模和测试
  2. 强制执行数据的不变性以降低复杂性并确保模块内的状态管理保持在最低限度
  3. 通过尽可能在模块内提供线程池线程来阻止手动创建线程

在我看来,插件本质上是一个数据转换实体(我在这里尝试考虑功能)。这意味着插件要么

如果你进一步理解这个概念,一个插件可以由所有三种类型组成 above.For 例如,在一个插件中,你可以有一个 IntGenerator 模块,它为 ConsoleWorkUnit 模块等生成一些数据。那又怎样我试图在主要功能中建模是插件必须完成其工作的布线。

为此,我有以下基础 类 使用 Microsoft 的 Immutable nuget。我想要实现的是抽象出 Rx 调用,以便它们可以在模块中使用,因此最终目标是将对缓冲区等的调用包装在抽象 类 中,可用于编写复杂的查询和模块.通过这种方式,代码比必须实际读取模块中的所有代码才能发现它订阅了缓冲区或 window 类型 x 等

的自我记录要多一些。
public abstract class OutputBase<TOutput> : SendOutputBase<TOutput>
{
    public abstract void Work();
}

public interface IBufferedBase<TOutput>
{
    void Work(IList<ImmutableList<Data<TOutput>>> list);
}

public abstract class BufferedWorkBase<TInput> : IBufferedBase<TInput>
{
    public abstract void Work(IList<ImmutableList<Data<TInput>>> input);
}
public abstract class SendOutputBase<TOutput>
{
    private readonly ReplaySubject<ImmutableList<Data<TOutput>>> _outputNotifier;
    private readonly IObservable<ImmutableList<Data<TOutput>>> _observable;

    protected SendOutputBase()
    {
        _outputNotifier = new ReplaySubject<ImmutableList<Data<TOutput>>>(10);
        _observable  =  _outputNotifier.SubscribeOn(ThreadPoolScheduler.Instance);
        _observable = _outputNotifier.ObserveOn(ThreadPoolScheduler.Instance);
    }

    protected void SetOutputTo(ImmutableList<Data<TOutput>> output)
    {
        _outputNotifier.OnNext(output);
    }

    public void ConnectOutputTo(IWorkBase<TOutput> unit)
    {
        _observable.Subscribe(unit.Work);
    }

    public void BufferOutputTo(int count, IBufferedBase<TOutput> unit)
    {
        _observable.Buffer(count).Subscribe(unit.Work);
    }
}

public abstract class WorkBase<TInput> : IWorkBase<TInput>
{
    public abstract void Work(ImmutableList<Data<TInput>> input);
}

public interface IWorkBase<TInput>
{
    void Work(ImmutableList<Data<TInput>> input);
}

public class Data<T>
{
    private readonly T _value;

    private Data(T value)
    {
        _value = value;
    }

    public static Data<TData> Create<TData>(TData value)
    {
        return new Data<TData>(value);
    }

    public T Value { get { return _value; } }

}

这些基数类用于创建三个类;一种用于生成一些 int 数据,一种用于在数据出现时打印出数据,最后一种用于在数据传入时缓冲数据并将值分成三部分求和。

public class IntGenerator : OutputBase<int>
{
    public override void Work()
    {
        var list = ImmutableList<Data<int>>.Empty;
        var builder = list.ToBuilder();
        for (var i = 0; i < 1000; i++)
        {
            builder.Add(Data<int>.Create(i));
        }

        SetOutputTo(builder.ToImmutable());
    }
}

public class ConsoleWorkUnit : WorkBase<int>
{
    public override void Work(ImmutableList<Data<int>> input)
    {
        foreach (var data in input)
        {
            Console.WriteLine("ConsoleWorkUnit printing {0}", data.Value);
        }
    }
}

public class SumPrinter : WorkBase<int>
{

    public override void Work(ImmutableList<Data<int>> input)
    {
        input.ToObservable().Buffer(2).Subscribe(PrintSum);
    }

    private void PrintSum(IList<Data<int>> obj)
    {
      Console.WriteLine("Sum of {0}, {1} is {2} ", obj.First().Value,obj.Last().Value ,obj.Sum(x=>x.Value) );
    }
}

这些 运行 像这样

        var intgen = new IntGenerator();
        var cons = new ConsoleWorkUnit();
        var sumPrinter = new SumPrinter();

        intgen.ConnectOutputTo(cons);
        intgen.BufferOutputTo(3,sumPrinter);

        Task.Factory.StartNew(intgen.Work);

        Console.ReadLine();

这个架构合理吗?

我认为你最好在你的模块中包含 Rx 而不是将它隐藏在你的基础 类 中。该路线导致您自己重新实现 Rx API 的一个子集。然后,如果一个插件在内部使用 Rx,你最终会白费力气,因为该插件实际上将你的非 Rx API 转换为 Rx API,而你本可以直接暴露API直接。

这样做的"Rx way":

发电机(案例 1)应该暴露(很可能 IObservable<T>:

interface IGenerator<T>
{
    // each subscription to this will create a new stream of data
    IObservable<T> Data { get; }
}

public class LongGenerator : IGenerator<long>
{
    IntGenerator()
    {
        // example that produces a new integer every second
        Data = Observable.Interval(TimeSpan.FromSeconds(1));

        // something more complex...
        Data = Observable.Create(async (observer, token) =>
        {
            // infinitely poll some web service for the number
            while (!token.CancellationRequested())
            {
                var result = await _client.WebServiceCall(...);
                observer.OnNext(result.Value);
            }
        });
    }
    public IObservable<T> Data { get; private set; }
}

消费者(案例 3)应该公开 IObserver<T>:

interface IConsumer<T>
{
    // Use IObserver<ImmutableList<T>> if you want to *force* them to receive buffered input
    // but why not let them do their own buffering if they need it?
    IObserver<T> Observer { get; }
}

public class ConsoleWork : IConsumer<int>, IObserver<int>
{
    public IObserver<T> Observer { get { return this; } }


    public void OnNext(int value) ...
    public void OnError(Exception e) ...
    public void OnComplete() ...
}

变形金刚(案例 2)应该有一个接受 IObservable<T> 和 returns 和 IObservable<U> 的方法:

interface ITransform<T, U>
{
    IObservable<U> Transform(IObservable<T>);
}

public class StringTransform : ITransform<int, string>
{
    IObservable<string> Transform(IObservable<int> source)
    {
        return source.Select(i => "Hello " + i.ToString());
    }
}