Rx:通过匹配 ID 加入

Rx: Join by matching IDs

让我们假设有两个 observables o1, o2。第一个从内部进程接收事件(在很长的计算完成后),第二个通过 REST 端点接收外部事件(表示另一个外部组件也已完成)。事件数据只是一个 ID。

现在我想设计一个工作流程,以便只有当 ID 存在于两个可观察对象中时,才会发出新事件(即当内部和外部计算完成时)。

让在某个时间点o1包含ID{1,2,3},那么我想区分这些情况:

  1. 正常情况: ID 2 到达 o2。两个 ID 现在都存在于两个可观察对象中,输出 "Success: 2"
  2. Expire case: 内部计算完成一段时间后,外部事件还没有到达。例如。 ID 2 出现在 o1 但不出现在 o2 甚至一小时后,输出: "Expired: 2"
  3. 未知案例:ID,例如4,通过 o1 中不存在的 REST 端点到达 o2,可能是因为 ID 已经过期或者仅仅因为外部组件故障,输出:"Unknown: 3"

我找到了 groupJoin 运算符,它可能会做我想做的事情,这里甚至还有一个属性匹配的例子:GroupJoin - Joins two streams matching by one of their attributes

但是,这个示例似乎在每次新事件到达时都对所有元素执行耗尽(线性时间)扫描。我认为有可能推出我自己的版本来代替在恒定时间内检查地图但是:我想知道是否有规范的方式或者甚至是开箱即用的功能(因为我想这是一个很常见的用例)。

(由于我是 Rx 的新手,实现这种连接操作的过期情况的最佳方法是什么)

我会通过在外部对象中设置中间状态来做到这一点:

public class ItemJoinCache<T> {
   private Map<Integer, T> items;
   public Observable<T> ingestInternal(T item) {
      // an internal item arrived, do the necessary work
   }
   public Observable<T> ingestExternal(T item) {
      // an external item arrived, do the necessary work
   }
}

externalRestCallThatReturnsObservable()
.flatMap(myItemJoinCache::ingestExternal)
...

internalProcessThatTakesALongTime()
.flatMap(myItemJoinCache::ingestInternal)
...

这样您就可以进行任何您可能需要的处理。

您也标记了问题 rx.net,所以我假设可以用 C# 给出答案。我不确定这对 Java 的翻译效果如何,如果这就是您要查找的内容。

Rx 的 JoinGroupJoin 并非真正用于此目的:它们旨在根据时间加入 -windows。您希望通过 ID 加入。

一个 Rx 友好的解决方案将会起作用。并且由于您需要一些状态,所以我们可以使用烘焙到 Scan 函数中的不可变状态。在 C# 中,Nuget 包 System.Collections.Immutable 中有 ImmutableDictionary<TKey, TItem>。我不确定 Java.

中是否存在等效项

鉴于这些 类:

public class CustomEvent
{
    public int Id { get; set; }
}

public class Result
{
    public ResultType Type { get; set; }
    public int Id { get; set; }
}

public enum ResultType
{
    Success,
    Unknown,
    Expired
}

您可以获得这样的解决方案:

IObservable<CustomEvent> o1;
IObservable<int> o2;
TimeSpan expirationTimeDelay = TimeSpan.FromHours(1);

IObservable<Result> results = Observable.Merge(
    o1.SelectMany(ce => Observable.Merge(
        Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h => 
            Tuple.Create(h.Add(ce.Id, ce), default(Result), false)
        )),
        Observable.Return(new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
            h.ContainsKey(ce.Id)
                ? Tuple.Create(h.Remove(ce.Id), new Result { Type = ResultType.Expired, Id = ce.Id}, true)
                : Tuple.Create(h, default(Result), false)
        ))
            .Delay(expirationTimeDelay)
    )),
    o2.Select(id => new Func<ImmutableDictionary<int, CustomEvent>, Tuple<ImmutableDictionary<int, CustomEvent>, Result, bool>>(h =>
        h.ContainsKey(id)
            ? Tuple.Create(h.Remove(id), new Result { Type = ResultType.Success, Id = id }, true)
            : Tuple.Create(h, new Result { Type = ResultType.Unknown, Id = id }, true)
    ))
)
.Scan(Tuple.Create(ImmutableDictionary<int, CustomEvent>.Empty, default(Result), false), (t, f) => f(t.Item1))
.Where(t => t.Item3)
.Select(t => t.Item2);

不可变字典是我们的核心状态,并保存来自 o1 的 'live' 个事件。累加器函数 returns 一个具有三个属性的元组:代表我们核心状态的不可变字典、结果对象和布尔值。布尔对象是一个过滤器,显示结果对象是否应该传播。

Scan 的一个有趣技巧是反转正常用法:将项目流转换为脱离状态的函数。在我们的例子中,函数的类型是 Func, Tuple, Results, Boolean>>(一个接受字典的函数,returns 一个包含三个值的元组)。

这就是我们在这里所做的:每个 o1 项目弹出两个函数:一个将项目添加到不可变字典(并且不推送结果)。另一个功能在一个小时后出现,以查看该事件是否尚未加入。如果加入,则什么也不会发生。如果未加入,则弹出 Expired 结果。每个 o2 项目弹出一个功能:检查项目是否在地图中。如果存在,则会弹出正常结果。如果不存在,则未知。

如果您在 Java,并且没有容易获得的 ImmutableDictionary 等价物,那么您可能可以用常规的 HashMap 代替,但您必须提防它来自多个订阅者的令人讨厌的状态问题 Publish 调用。

您始终可以使用 scan 将 o1 缩减为一个集合。当 o2 发出一个值时,您使用 withLatestFrom 从 o1 获取最新的集合并检查包含。一个timeout可以解决过期部分。 RxJs 5 中的示例:

o2
.withLatestFrom(
  o1.scan((set, val) => set.add(val), new Set),
  (o2Val, o1Set) => o1Set.has(o2Val) ? "Success" : "Unknown"
)
.timeoutWith(3600000, Observable.of("Expire"))
.subscribe(console.log)