反应式编程中流之间的循环依赖

Cycling dependencies between streams in reactive programming

涉猎响应式编程,经常遇到两个流相互依赖的情况。解决这些情况的惯用方法是什么?

一个最小的例子:有按钮A和B,都显示一个值。单击 A 必须将 A 的值增加 B。单击 B 必须将 B 的值设置为 A。

我想出的第一个解决方案(F# 中的示例,但欢迎使用任何语言回答):

let solution1 buttonA buttonB =
    let mutable lastA = 0
    let mutable lastB = 1
    let a = new Subject<_> ()
    let b = new Subject<_> ()
    (OnClick buttonA).Subscribe(fun _ -> lastA <- lastA + lastB; a.OnNext lastA) 
    (OnClick buttonB).Subscribe(fun _ -> lastB <- lastA; b.OnNext lastB)
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonA)
    a.OnNext 0
    b.OnNext 1

此解决方案使用可变状态和主题,可读性不强且看起来不符合习惯。

我尝试的第二个解决方案涉及创建一个将两个相关流链接在一起的方法:

let dependency (aGivenB: IObservable<_> -> IObservable<_>) (bGivenA: IObservable<_> -> IObservable<_>) =
    let bProxy = new ReplaySubject<_> () 
    let a = aGivenB bProxy
    let b = bGivenA a
    b.Subscribe(bProxy.OnNext)
    a, b

let solution2 buttonA buttonB =
    let aGivenB b =
        Observable.WithLatestFrom(OnClick buttonA, b, fun click bValue -> bValue)
                  .Scan(fun acc x -> acc + x)
                  .StartWith(0)
    let bGivenA a =
        Observable.Sample(a, OnClick buttonB)
                  .StartWith(1)
    let a, b = dependency aGivenB bGivenA
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonB)

这似乎好一点,但由于在反应库中不存在像 dependency 这样的方法,我相信存在更惯用的解决方案。使用第二种方法也很容易引入无限递归。

解决涉及流之间循环依赖的问题的推荐方法是什么,例如在上面的例子中,在反应式编程中?

编辑:

这是一个 F# 解决方案:

type DU = 
    | A 
    | B 

type State = { AValue : int; BValue : int }

let solution2 (aObservable:IObservable<_>, bObservable:IObservable<_>) = 

    let union = aObservable.Select(fun _ -> A).Merge(bObservable.Select(fun _ -> B))

    let result = union.Scan({AValue = 0; BValue = 1}, fun state du -> match du with
        | A -> { state with AValue = state.AValue + state.BValue }
        | B -> { state with BValue = state.AValue }
    )

    result

F# 实际上是一种很好的语言,这要归功于 built-in 可区分的联合和记录。这是一个用 C# 编写的答案,带有自定义的 Discriminated Union;我的 F# 有点生疏了。

诀窍是使用可区分联合将您的两个可观察值变成一个可观察值。所以基本上将 a 和 b 合并为一个可观察的可区分联合:

a : *---*---*---**
b : -*-*--*---*---
du: ab-ba-b-a-b-aa

一旦完成,您就可以对项目是 'A' 推送还是 'B' 推送做出反应。

只是为了确认,我假设没有办法显式设置 ButtonA/ButtonB 中嵌入的值。如果有,这些变化应该被建模为可观察的,并且也被用于被歧视的联合。

var a = new Subject<Unit>();
var b = new Subject<Unit>();
var observable = a.DiscriminatedUnion(b)
    .Scan(new State(0, 1), (state, du) => du.Unify(
        /* A clicked case */_ => new State(state.A + state.B, state.B), 
        /* B clicked case */_ => new State(state.A, state.A)
    )
);

observable.Subscribe(state => Console.WriteLine($"a = {state.A}, b = {state.B}"));
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);

这是 C# 中依赖的 类。其中大部分很容易转换为 built-in F# 类型。

public class State /*easily replaced with an F# record */
{
    public State(int a, int b)
    {
        A = a;
        B = b;
    }

    public int A { get; }
    public int B { get; }
}

/* easily replaced with built-in discriminated unions and pattern matching */
public static class DiscriminatedUnionExtensions
{
    public static IObservable<DiscriminatedUnionClass<T1, T2>> DiscriminatedUnion<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return Observable.Merge(
            a.Select(t1 => DiscriminatedUnionClass<T1, T2>.Create(t1)),
            b.Select(t2 => DiscriminatedUnionClass<T1, T2>.Create(t2))
        );
    }

    public static IObservable<TResult> Unify<T1, T2, TResult>(this IObservable<DiscriminatedUnionClass<T1, T2>> source,
        Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return source.Select(union => Unify(union, f1, f2));
    }

    public static TResult Unify<T1, T2, TResult>(this DiscriminatedUnionClass<T1, T2> union, Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return union.Item == 1
            ? f1(union.Item1)
            : f2(union.Item2)
        ;
    }
}

public class DiscriminatedUnionClass<T1, T2>
{
    private readonly T1 _t1;
    private readonly T2 _t2;
    private readonly int _item;
    private DiscriminatedUnionClass(T1 t1, T2 t2, int item)
    {
        _t1 = t1;
        _t2 = t2;
        _item = item;
    }

    public int Item
    {
        get { return _item; }
    }

    public T1 Item1
    {
        get { return _t1; }
    }

    public T2 Item2
    {
        get { return _t2; }
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T1 t1)
    {
        return new DiscriminatedUnionClass<T1, T2>(t1, default(T2), 1);
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T2 t2)
    {
        return new DiscriminatedUnionClass<T1, T2>(default(T1), t2, 2);
    }
}

这是一个使用 Gjallarhorn 的非常简单的解决方案:

#r @"..\packages\Gjallarhorn\lib\portable-net45+netcore45+wpa81+wp8+MonoAndroid1+MonoTouch1\Gjallarhorn.dll"

open Gjallarhorn

(*
    Clicking on A must increment the value of A by B. Clicking on B must set the value of B to A.
*)
let  a = Mutable.create 3
let b = Mutable.create 4

let clickA() = a.Value <- a.Value + b.Value
let clickB() = b.Value <- a.Value

let d1 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked A: " + x.ToString()) a
let d2 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked B: " + x.ToString()) b

clickA()
clickB()  

它实际上与您的初始版本非常相似,因此确实使用了可变状态,但使绑定到 UI 变得非常容易,有关更多惯用用法,请参见此 blog post

假设输出最终被发送回源,您可以使用基本运算符执行此操作。您所要做的就是为每个 button/signal 可观察对象调用 withLatestFrom 两次。我的解决方案在 java 中,但应该很容易理解!

private static Pair<Observable<Integer>, Observable<Integer>> test(
    final Observable<Integer> aValues,
    final Observable<Integer> bValues,
    final Observable<Void> aButton,
    final Observable<Void> bButton,
    final Func2<Integer, Integer, Integer> aFunction,
    final Func2<Integer, Integer, Integer> bFunction
) {
    return new Pair<>(
        aButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, aFunction),
        bButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, bFunction)
    );
}

这是我使用的测试代码:

final TestScheduler scheduler = new TestScheduler();

final TestSubject<Integer> aSubject = TestSubject.create(scheduler);
final TestSubject<Integer> bSubject = TestSubject.create(scheduler);
aSubject.onNext(1);
bSubject.onNext(1);

final TestSubject<Void> aButton = TestSubject.create(scheduler);
final TestSubject<Void> bButton = TestSubject.create(scheduler);

final Pair<Observable<Integer>, Observable<Integer>> pair = test(
    aSubject, bSubject, aButton, bButton, (a, b) -> a + b, (a, b) -> a
);

pair.component1().subscribe(aSubject::onNext);
pair.component2().subscribe(bSubject::onNext);
pair.component1().map(a -> "A: " + a).subscribe(System.out::println);
pair.component2().map(b -> "B: " + b).subscribe(System.out::println);

aButton.onNext(null); scheduler.triggerActions();
bButton.onNext(null); scheduler.triggerActions();
aButton.onNext(null); scheduler.triggerActions();
aButton.onNext(null); scheduler.triggerActions();
bButton.onNext(null); scheduler.triggerActions();

这会打印:

A: 2
B: 2
A: 4
A: 6
B: 6