rx.net 合并重置流
rx.net merge reset stream
我们有一个 运行 服务,它处理从系统 x 到系统 y 的消息。
它基本上如下所示:
aSystem.Messages.Subscribe(message =>
{
try
{
ProcessMessage(message);
}
catch(Exception ex)
{
_logger.LogFatal(ex.Message);
}
})
问题是我们每秒至少收到一条消息,并且我们的 LogFatal
消息配置为发送电子邮件。结果邮箱一瞬间炸了
代码是 "improved" 通过添加自定义日志记录 class 来保存最后一个时间戳。根据该时间戳,消息将被记录或不被记录。
这看起来很麻烦,我认为这是 Rx.NET
的完美场景。我们需要的是:
- 1) 记录字符串是否改变
- 2) 如果经过一定时间则记录
我试过的是:
var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(10)); // log at least every 10 seconds
var subscription = logMessagesChangedStream.Merge(logMessagesSampleStream).Subscribe(result =>
{
_logger.LogFatal(result);
});
aSystem.Messages.Subscribe(message =>
{
try
{
ProcessMessage(message);
}
catch(Exception ex)
{
logSubject.OnNext(ex.Message);
}
})
看起来它在工作,但这将记录消息两次,一次针对 DistinctUntilChanged
,一次针对 Sample
。因此,如果其中一个发出值,我应该以某种方式重置流。他们独立工作完美,一旦合并他们应该互相倾听 ;-)
有一个模棱两可的运算符 Amb
,它会与两个序列竞争,看哪个先赢。
Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
获胜的流继续传播到最后 - 我们不希望这样。我们有兴趣为下一个值再次开始比赛。让我们这样做:
Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
.Take(1)
.Repeat()
现在最后一个问题是,每次我们重新开始比赛时,DistinctUntilChanged
都会丢失它的状态,它的行为是立即推送它获得的第一个值。让我们通过将其变成热可观察对象来解决这个问题。
logSubject.DistinctUntilChanged().Publish();
综合起来:
var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged().Publish(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(5)); // log at least every 5 seconds
var oneof =
Observable
.Amb(logMessagesChangedStream, logMessagesSampleStream)
.Take(1)
.Repeat();
logMessagesChangedStream.Connect();
oneof.Timestamp().Subscribe(c => Console.WriteLine(c));
将 10 秒更改为 5,because。
测试
Action<string> onNext = logSubject.OnNext;
onNext("a");
onNext("b");
Delay(1000, () => { onNext("c"); onNext("c"); onNext("c"); onNext("d"); });
Delay(3000, () => { onNext("d"); onNext("e"); });
Delay(6000, () => { onNext("e"); });
Delay(10000, () => { onNext("e"); });
输出
a@1/30/2020 12:17:52 PM +00:00
b@1/30/2020 12:17:52 PM +00:00
c@1/30/2020 12:17:53 PM +00:00
d@1/30/2020 12:17:53 PM +00:00
e@1/30/2020 12:17:55 PM +00:00
e@1/30/2020 12:18:00 PM +00:00
e@1/30/2020 12:18:05 PM +00:00
我们有一个 运行 服务,它处理从系统 x 到系统 y 的消息。 它基本上如下所示:
aSystem.Messages.Subscribe(message =>
{
try
{
ProcessMessage(message);
}
catch(Exception ex)
{
_logger.LogFatal(ex.Message);
}
})
问题是我们每秒至少收到一条消息,并且我们的 LogFatal
消息配置为发送电子邮件。结果邮箱一瞬间炸了
代码是 "improved" 通过添加自定义日志记录 class 来保存最后一个时间戳。根据该时间戳,消息将被记录或不被记录。
这看起来很麻烦,我认为这是 Rx.NET
的完美场景。我们需要的是:
- 1) 记录字符串是否改变
- 2) 如果经过一定时间则记录
我试过的是:
var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(10)); // log at least every 10 seconds
var subscription = logMessagesChangedStream.Merge(logMessagesSampleStream).Subscribe(result =>
{
_logger.LogFatal(result);
});
aSystem.Messages.Subscribe(message =>
{
try
{
ProcessMessage(message);
}
catch(Exception ex)
{
logSubject.OnNext(ex.Message);
}
})
看起来它在工作,但这将记录消息两次,一次针对 DistinctUntilChanged
,一次针对 Sample
。因此,如果其中一个发出值,我应该以某种方式重置流。他们独立工作完美,一旦合并他们应该互相倾听 ;-)
有一个模棱两可的运算符 Amb
,它会与两个序列竞争,看哪个先赢。
Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
获胜的流继续传播到最后 - 我们不希望这样。我们有兴趣为下一个值再次开始比赛。让我们这样做:
Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
.Take(1)
.Repeat()
现在最后一个问题是,每次我们重新开始比赛时,DistinctUntilChanged
都会丢失它的状态,它的行为是立即推送它获得的第一个值。让我们通过将其变成热可观察对象来解决这个问题。
logSubject.DistinctUntilChanged().Publish();
综合起来:
var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged().Publish(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(5)); // log at least every 5 seconds
var oneof =
Observable
.Amb(logMessagesChangedStream, logMessagesSampleStream)
.Take(1)
.Repeat();
logMessagesChangedStream.Connect();
oneof.Timestamp().Subscribe(c => Console.WriteLine(c));
将 10 秒更改为 5,because。
测试
Action<string> onNext = logSubject.OnNext;
onNext("a");
onNext("b");
Delay(1000, () => { onNext("c"); onNext("c"); onNext("c"); onNext("d"); });
Delay(3000, () => { onNext("d"); onNext("e"); });
Delay(6000, () => { onNext("e"); });
Delay(10000, () => { onNext("e"); });
输出
a@1/30/2020 12:17:52 PM +00:00
b@1/30/2020 12:17:52 PM +00:00
c@1/30/2020 12:17:53 PM +00:00
d@1/30/2020 12:17:53 PM +00:00
e@1/30/2020 12:17:55 PM +00:00
e@1/30/2020 12:18:00 PM +00:00
e@1/30/2020 12:18:05 PM +00:00