当最终两个或更多给定事件发生在一个 Observable 上时,do x

When eventually two or more given events occur on the one Observable do x

我有一个如下所示的对象:

public class Message {
   public string Topic { get; set; }
   public string ContextId { get; set; }
   ...
}

有问题的 observable 被用作消息总线,我想允许 "observers" 说何时出现主题为 a 的消息,然后主题为 b同样 ContextIdx.

你有什么想法吗?此外,如果这样做有任何负面影响(即内存是否会被阻止是给定主题的消息对之一不会发生)并且是否可以防御这种情况?

根据您的实际需要,有多种方法可以做到这一点。从你的问题中不清楚具体情况是什么。

我认为您想同时跟踪多个 ContextId,以应对出现 .Topic == "a" 后跟下一个值 ContextId 的情况] 通过 .Topic == "b"。这意味着两条消息之间可以出现任意数量的其他 ContextId 值的消息。如果不是这种情况,请告诉我。

var query =
    messages
        .GroupBy(x => x.ContextId)
        .Select(xs =>
            xs
                .Publish(ys =>
                    ys
                        .Where(y => y.Topic == "A")
                        .Select(y =>
                            ys
                                .Where(w => w.Topic == "B")
                                .TakeUntil(ys.Where(w => w.Topic != "B")))
                        .Switch()))
        .Merge();

我的测试数据是:

messages.OnNext(new Message() { ContextId = "1", Topic = "A" });
messages.OnNext(new Message() { ContextId = "2", Topic = "B" });
messages.OnNext(new Message() { ContextId = "3", Topic = "C" });
messages.OnNext(new Message() { ContextId = "2", Topic = "A" });
messages.OnNext(new Message() { ContextId = "1", Topic = "B" });
messages.OnNext(new Message() { ContextId = "2", Topic = "C" });
messages.OnNext(new Message() { ContextId = "2", Topic = "B" });

从中我只得到一个值Message() { ContextId = "1", Topic = "B" }

就内存而言,如果出现足够多的 ContextId 值,这最终将耗尽计算机上的所有内存。当然必须。但了解多少的唯一方法是在您的代码中对其进行测量。

请记住,一旦您在订阅上调用 .Dispose(),您的记忆就会恢复。