简单命令式任务的反应式方法
Reactive approach to simple imperative task
申请要求:
- 订阅两个事件流 A 和 B
- 对于每个 A 事件,一段时间后应该有相应的 B 事件
- 应用程序监视 A 事件并在没有对应的 B 到达(及时)时发出警报
- B 事件可以以不同于 A 事件的顺序到达(但仅在之后)、延迟或根本不到达
这在传统方法中很简单:
- 记录集合中的每个 A 事件
- 当相应的 B 事件到达时删除 A 事件
- 监控在超时内未收到 B 的 A 事件的集合,并生成警报
我想尝试一个反应式解决方案,但不知道哪个运营商最能表达这一点。我可视化事件流(Observables):
- A 的流
- B 流
- 也许还有一个计时器滴答流(除非计时运算符满足这个要求)
最终输出是一组警报:
someObservable.
...incantations, other observables...
.subscribe ((EventA a) -> raiseAlertForMissingB (a));
是否有优雅的响应式方法,或者这只是 不适合 rx?
(对原始问题的后续补充)
ascii 艺术弹珠图可能如下所示
A stream A1----A2----------A3------------------A4--------------A5---------
B stream ------------B1------------B3---------------B4------------B5-----
(A2 TIMEOUT)
merged ------------A1B1----------A3B3------A2??---A4B4----------A5B5---
订户从合并流中接收元组。如果元组是匹配的 A 和 B 事件,它会被记录下来,但如果元组是一个没有匹配 B 事件的 A 事件(在弹珠图中显示为 A2??),订阅者会发出警报。中提琴!
但是如何为每个A事件触发定时等待匹配的B事件?
(另加)
为了说明第4个要求点点部分"B events can arrive in a different order from the A events (but only after)..."
A stream A6----A7----------A8----------------------
B stream ------------B7------------B6-------B8-----
merged ------------A7B7----------A6B6-----A8B8---
2016 年 2 月 23 日更新
我正在测试移植到 Java 的建议解决方案。
解:谜
public class Test06Enigmativity {
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
private static final int[] bOrder = {
0, 1, 2,
4,
3, // out of order
6,
5, // out of order
7, 8,
10, 11, 12, 13, 14,
9, // out of order
15
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06Enigmativity app = new Test06Enigmativity ();
app.runEnigmativity ();
}
private void runEnigmativity () {
Observable<Long> aStream =
Observable.interval (A_PERIOD, TimeUnit.MILLISECONDS)
.doOnNext (seq -> {
output (" A%s", seq);
}).take (bOrder.length);
Observable<Long> bStream =
Observable.interval (B_PERIOD, TimeUnit.MILLISECONDS)
.map (seq -> {
long aId = (long) bOrder[seq.intValue ()];
output (" B%s", aId);
return aId;
})
.take (bOrder.length);
monitorEnigmativity (aStream, bStream, TIMEOUT)
.subscribe (this::output);
try {
Thread.sleep (60_000);
} catch (InterruptedException e) {
}
}
private Observable<String> monitorEnigmativity (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return Observable.create (subscriber ->
bStream.publish (pb ->
Observable.merge (
aStream.map (ax ->
pb
.filter (pbx -> pbx.equals (ax))
.take (1)
.timeout (thresholdMsec, TimeUnit.MILLISECONDS, Observable.defer (
() -> {
output (" timeout on B%s", ax);
return Observable.just (-1L);
}
)).map (pbx -> String.format ("%s,%s", ax, pbx))
)
)
).subscribe (subscriber::onNext)
);
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), System.currentTimeMillis () - startTime,
String.format (format, args));
}
}
这适用于:
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
tid: 12 511ms A0
tid: 15 626ms B0
tid: 15 626ms 0,0
tid: 12 907ms A1
tid: 15 1126ms B1
tid: 15 1126ms 1,1
tid: 12 1307ms A2
tid: 15 1626ms B2
tid: 15 1626ms 2,2
tid: 12 1707ms A3
tid: 12 2107ms A4
tid: 15 2126ms B4
tid: 15 2126ms 4,4
tid: 12 2507ms A5
tid: 15 2625ms B3
tid: 15 2625ms 3,3
tid: 12 2907ms A6
tid: 15 3126ms B6
tid: 15 3126ms 6,6
tid: 12 3307ms A7
tid: 15 3626ms B5
tid: 15 3626ms 5,5
tid: 12 3707ms A8
tid: 12 4107ms A9
tid: 15 4126ms B7
tid: 15 4126ms 7,7
tid: 12 4507ms A10
tid: 15 4626ms B8
tid: 15 4626ms 8,8
tid: 12 4908ms A11
tid: 15 5127ms B10
tid: 15 5128ms 10,10
tid: 12 5307ms A12
tid: 15 5626ms B11
tid: 15 5626ms 11,11
tid: 12 5707ms A13
tid: 12 6107ms A14
tid: 15 6126ms B12
tid: 15 6126ms 12,12
tid: 12 6507ms A15
tid: 15 6626ms B13
tid: 15 6626ms 13,13
tid: 13 7109ms timeout on B9
tid: 13 7114ms 9,-1
tid: 15 7126ms B14
tid: 15 7126ms 14,14
tid: 15 7625ms B9
tid: 15 8126ms B15
tid: 15 8126ms 15,15
但是随着 B 到达 A 的附近,事件可能会被错过:
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 410;
private static final int TIMEOUT = 2_000;
tid: 12 509ms A0
tid: 15 538ms B0
tid: 12 905ms A1
tid: 15 948ms B1
tid: 15 948ms 1,1
tid: 12 1305ms A2
tid: 15 1358ms B2
tid: 15 1358ms 2,2
tid: 12 1706ms A3
tid: 15 1768ms B4
tid: 12 2105ms A4
tid: 15 2178ms B3
tid: 15 2178ms 3,3
tid: 12 2505ms A5
tid: 16 2538ms timeout on B0
tid: 16 2544ms 0,-1
tid: 15 2588ms B6
tid: 12 2905ms A6
tid: 15 2998ms B5
tid: 15 2998ms 5,5
tid: 12 3305ms A7
tid: 15 3408ms B7
tid: 15 3408ms 7,7
tid: 12 3705ms A8
tid: 15 3817ms B8
tid: 15 3817ms 8,8
tid: 12 4105ms A9
tid: 14 4106ms timeout on B4
tid: 14 4106ms 4,-1
tid: 15 4228ms B10
tid: 12 4505ms A10
tid: 15 4638ms B11
tid: 12 4905ms A11
tid: 16 4906ms timeout on B6
tid: 16 4906ms 6,-1
tid: 15 5048ms B12
tid: 12 5305ms A12
tid: 15 5457ms B13
tid: 12 5705ms A13
tid: 15 5868ms B14
tid: 12 6106ms A14
tid: 13 6107ms timeout on B9
tid: 13 6107ms 9,-1
tid: 15 6279ms B9
tid: 14 6510ms timeout on B10
tid: 12 6510ms A15
tid: 14 6510ms 10,-1
tid: 15 6688ms B15
tid: 15 6688ms 15,15
B0 到达但产生超时而不是匹配。这是因为 B0 事件在 A 流的观察者更改为订阅 B 流之前到达。我认为对于热门的 B-stream,嵌套订阅的流行方法是有缺陷的。需要的是某种有限的最近重播流 - 一个主题的应用程序?
解法:Supertopi
package test;
import rx.Observable;
import java.util.concurrent.TimeUnit;
public class Test06Supertopi {
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 3_000;
private static final int[] bOrder = {
0, 1, 2,
4,
3, // out of order
6,
5, // out of order
7, 8,
10, 11, 12, 13, 14,
9, // out of order
15
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06Supertopi app = new Test06Supertopi ();
app.runSupertopi ();
}
private void runSupertopi () {
Observable<Long> aStream =
Observable.interval (A_PERIOD, TimeUnit.MILLISECONDS)
.doOnNext (seq -> {
output (" A%s", seq);
}).take (bOrder.length);
Observable<Long> bStream =
Observable.interval (B_PERIOD, TimeUnit.MILLISECONDS)
.map (seq -> {
long aId = (long) bOrder[seq.intValue ()];
output (" B%s", aId);
return aId;
})
.take (bOrder.length);
monitorSupertopi (aStream, bStream, TIMEOUT)
.subscribe (this::output);
try {
Thread.sleep (60_000);
} catch (InterruptedException e) {
}
}
private Observable<String> monitorSupertopi (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return Observable.create (subscriber -> {
Observable<Long> a = aStream.publish ().refCount ();
Observable<Long> b = bStream.publish ().refCount ();
a.subscribe ((Long aId) -> {
Observable.merge (
Observable.timer (thresholdMsec, TimeUnit.MILLISECONDS)
.doOnNext (x -> {
output (" timeout on B%s", aId);
})
.map (x -> String.format ("%s,%s", aId, -1L)),
b.filter ((Long j) -> j.equals (aId))
.map ((Long pbx) -> String.format ("%s,%s", aId, pbx))
).take (1)
.subscribe (subscriber::onNext);
});
});
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), System.currentTimeMillis () - startTime,
String.format (format, args));
}
}
工作正常:
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
tid: 14 522ms A0
tid: 14 922ms A1
tid: 16 1054ms B0
tid: 16 1055ms 0,0
tid: 14 1322ms A2
tid: 16 1555ms B1
tid: 16 1555ms 1,1
tid: 14 1721ms A3
tid: 16 2055ms B2
tid: 16 2055ms 2,2
tid: 14 2122ms A4
tid: 14 2522ms A5
tid: 16 2555ms B4
tid: 16 2555ms 4,4
tid: 14 2922ms A6
tid: 16 3055ms B3
tid: 16 3055ms 3,3
tid: 14 3322ms A7
tid: 16 3555ms B6
tid: 16 3555ms 6,6
tid: 14 3722ms A8
tid: 16 4055ms B5
tid: 16 4055ms 5,5
tid: 14 4122ms A9
tid: 14 4522ms A10
tid: 16 4555ms B7
tid: 16 4555ms 7,7
tid: 14 4922ms A11
tid: 16 5055ms B8
tid: 16 5055ms 8,8
tid: 14 5322ms A12
tid: 16 5555ms B10
tid: 16 5556ms 10,10
tid: 14 5723ms A13
tid: 16 6056ms B11
tid: 16 6057ms 11,11
tid: 14 6122ms A14
tid: 14 6522ms A15
tid: 16 6555ms B12
tid: 16 6555ms 12,12
tid: 16 7055ms B13
tid: 16 7055ms 13,13
tid: 13 7125ms timeout on B9
tid: 13 7125ms 9,-1
tid: 16 7555ms B14
tid: 16 7555ms 14,14
tid: 16 8055ms B9
tid: 16 8555ms B15
tid: 16 8555ms 15,15
但是随着时间的推移,初始的 B 事件被接收了两次。我还在调查这个。
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 410;
private static final int TIMEOUT = 2_000;
tid: 14 539ms A0
tid: 14 939ms A1
tid: 16 983ms B0
tid: 16 983ms 0,0
tid: 14 1339ms A2
tid: 16 1393ms B1
tid: 16 1393ms 1,1
tid: 14 1739ms A3
tid: 16 1803ms B2
tid: 16 1803ms 2,2
tid: 14 2139ms A4
tid: 16 2213ms B4
tid: 16 2213ms 4,4
tid: 14 2539ms A5
tid: 16 2623ms B3
tid: 16 2623ms 3,3
tid: 14 2939ms A6
tid: 16 3032ms B6
tid: 16 3032ms 6,6
tid: 14 3339ms A7
tid: 16 3443ms B5
tid: 16 3443ms 5,5
tid: 14 3739ms A8
tid: 16 3852ms B7
tid: 16 3852ms 7,7
tid: 14 4139ms A9
tid: 16 4263ms B8
tid: 16 4263ms 8,8
tid: 14 4539ms A10
tid: 16 4672ms B10
tid: 16 4672ms 10,10
tid: 14 4939ms A11
tid: 16 5083ms B11
tid: 16 5083ms 11,11
tid: 14 5339ms A12
tid: 16 5493ms B12
tid: 16 5493ms 12,12
tid: 14 5739ms A13
tid: 16 5903ms B13
tid: 16 5903ms 13,13
tid: 14 6139ms A14
tid: 13 6140ms timeout on B9
tid: 13 6140ms 9,-1
tid: 16 6313ms B14
tid: 16 6313ms 14,14
tid: 14 6539ms A15
tid: 14 6950ms B0
tid: 14 7360ms B1
tid: 14 7770ms B2
tid: 14 8180ms B4
tid: 13 8540ms timeout on B15
tid: 13 8540ms 15,-1
24/2/2016
解法:李坎贝尔
package test;
import rx.Observable;
import rx.subjects.PublishSubject;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Test06LeeCampbell {
private static final int TIMEOUT = 500;
class ScheduledEvent {
final String type;
final long aId;
final long atMsec;
volatile boolean expired;
public ScheduledEvent (long atMsec, String type, long aId) {
this.atMsec = atMsec;
this.type = type;
this.aId = aId;
}
}
ScheduledEvent[] scheduledEvents = {
new ScheduledEvent (10, "A", 0),
new ScheduledEvent (90, "B", 0),
new ScheduledEvent (110, "A", 1),
new ScheduledEvent (140, "B", 1),
new ScheduledEvent (190, "A", 2),
new ScheduledEvent (270, "B", 2),
new ScheduledEvent (310, "A", 3),
new ScheduledEvent (410, "A", 4),
new ScheduledEvent (440, "B", 4),
new ScheduledEvent (480, "B", 3),
new ScheduledEvent (510, "A", 5),
new ScheduledEvent (610, "A", 6),
//new ScheduledEvent (670, "B", 6),
new ScheduledEvent (710, "A", 7),
new ScheduledEvent (810, "A", 8),
new ScheduledEvent (860, "B", 7),
new ScheduledEvent (880, "B", 8),
new ScheduledEvent (910, "A", 9),
new ScheduledEvent (1100, "A", 10),
new ScheduledEvent (1110, "A", 11),
new ScheduledEvent (1120, "A", 12),
new ScheduledEvent (1130, "A", 13),
new ScheduledEvent (1140, "A", 14),
//new ScheduledEvent (1200, "B", 10),
//new ScheduledEvent (1210, "B", 11),
//new ScheduledEvent (1220, "B", 12),
new ScheduledEvent (1230, "B", 13),
new ScheduledEvent (1240, "B", 14),
new ScheduledEvent (1390, "B", 9),
new ScheduledEvent (1450, "A", 15),
new ScheduledEvent (3290, "B", 5),
new ScheduledEvent (3350, "B", 15)
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06LeeCampbell app = new Test06LeeCampbell ();
app.runLeeCampbell ();
}
private void runLeeCampbell () {
Observable<Long> aStream =
getCrudeSequencer ("A")
.doOnNext (seq -> {
output (" A%s", seq);
});
Observable<Long> bStreamCold =
getCrudeSequencer ("B")
.doOnNext (seq -> {
output (" B%s", seq);
});
PublishSubject<Long> bStream = PublishSubject.create ();
bStreamCold.subscribe (bStream);
monitorLeeCampbell (aStream, bStream, TIMEOUT)
.subscribe (this::output);
pause (10_000);
}
private Observable<String> monitorLeeCampbell (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return aStream.flatMap (a ->
bStream.filter (b -> b.equals (a))
.map (b -> String.format ("%s,%s", a, b))
.take (1)
.timeout (thresholdMsec, TimeUnit.MILLISECONDS)
.onErrorResumeNext (
throwable -> {
output (" timeout on B%s", a);
if (!(throwable instanceof TimeoutException)) {
throw new RuntimeException (throwable);
}
return Observable.just (String.format ("%s,%s", a, -1L));
}
)
);
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), getElapsedMsec (),
String.format (format, args));
}
private long getElapsedMsec () {
return System.currentTimeMillis () - startTime;
}
private Observable<Long> getCrudeSequencer (String name) {
return Observable.create (subscriber ->
new Thread (() -> {
for (ScheduledEvent se : scheduledEvents) {
if (se.type.equals (name)) {
while (getElapsedMsec () < se.atMsec) {
pause (1);
}
subscriber.onNext (Long.valueOf (se.aId));
se.expired = true;
} else {
// Timing is not reliable for sequencing two threads
while (!se.expired) {
pause (1);
}
}
}
subscriber.onCompleted ();
}).start ()
);
}
private static void pause (final int millis) {
try {
Thread.sleep (millis);
} catch (InterruptedException e) {
}
}
}
有效
tid: 12 90ms A0
tid: 11 157ms B0
tid: 11 157ms 0,0
tid: 12 159ms A1
tid: 11 160ms B1
tid: 11 160ms 1,1
tid: 12 190ms A2
tid: 11 270ms B2
tid: 11 270ms 2,2
tid: 12 310ms A3
tid: 12 410ms A4
tid: 11 440ms B4
tid: 11 440ms 4,4
tid: 11 480ms B3
tid: 11 480ms 3,3
tid: 12 510ms A5
tid: 12 610ms A6
tid: 12 710ms A7
tid: 12 810ms A8
tid: 11 860ms B7
tid: 11 860ms 7,7
tid: 11 880ms B8
tid: 11 880ms 8,8
tid: 12 910ms A9
tid: 15 1011ms timeout on B5
tid: 15 1012ms 5,-1
tid: 12 1100ms A10
tid: 12 1110ms A11
tid: 16 1111ms timeout on B6
tid: 16 1111ms 6,-1
tid: 12 1120ms A12
tid: 12 1130ms A13
tid: 12 1140ms A14
tid: 11 1230ms B13
tid: 11 1230ms 13,13
tid: 11 1240ms B14
tid: 11 1240ms 14,14
tid: 11 1390ms B9
tid: 11 1390ms 9,9
tid: 12 1450ms A15
tid: 14 1601ms timeout on B10
tid: 14 1601ms 10,-1
tid: 15 1611ms timeout on B11
tid: 15 1611ms 11,-1
tid: 16 1621ms timeout on B12
tid: 16 1621ms 12,-1
tid: 19 1951ms timeout on B15
tid: 19 1951ms 15,-1
tid: 11 3290ms B5
tid: 11 3350ms B15
这应该有效。类型不同,我不想猜测您可能的抽象数据类型。您可以很容易地应用它们(函数参数、键比较和 Select
语句)
想法是,对于来自 a
的每个发出的值,我们采用 b.Where(keys match)
或超时(由 Observable.Timer
表示)发出的第一个值,并使我们的 Select
基于此信息。
我假设在超时情况下您还希望 OnNext
通知具有一些错误配置机制。
private IObservable<string> MonitorAB(IObservable<long> a, IObservable<long> b,
TimeSpan threshold)
{
return Observable.Create<string>((obs) =>
{
a = a.Publish().RefCount();
b = b.Publish().RefCount();
return a.Subscribe(i =>
{
Observable.Merge(Observable.Timer(threshold).Select(_ => $"Timeout for A{i}"),
b.Where(j => j == i).Select(_ => $"Got matching B for A{i}"))
.Take(1)
.Subscribe(obs.OnNext);
});
});
}
我是这样测试的
private void Test()
{
var a = Observable.Interval(TimeSpan.FromSeconds(2)).Take(5);
var b = Observable.Interval(TimeSpan.FromSeconds(5)).Take(5);
MonitorAB( a, b, TimeSpan.FromSeconds(13)).Subscribe(Console.WriteLine);
}
编辑:
要测试乱序情况,您可以翻转 B 流,例如
var b = Observable.Interval(TimeSpan.FromSeconds(2)).Select(i => 4 - i).Take(5);
假设:
- B流火爆,
- 您有一个可以表示匹配或不匹配的 return 类型(而不是使用将终止订阅的 OnError/Timeout)
那就这样就好了
AStream.SelectMany(a =>
BStream.Where(b => b == a)
.Select(b => new MatchMade(a, b))
.Take(1)
.Timeout(matchTimeout)
.Catch<TimeoutException>(ex=>Observable.Return(new NoMatchMade(a)))
)
我认为这可以满足您的需求,尽管它是 C# 而不是 Java - 我相信您可以轻松转换。
private IObservable<string> MonitorAB(
IObservable<long> a, IObservable<long> b, TimeSpan threshold)
{
return Observable.Create<string>(o =>
b.Publish(pb =>
a.Select(ax =>
pb.Where(pbx => pbx == ax)
.Take(1)
.Timeout(threshold, Observable.Return(-1L))
.Select(pbx => String.Format("{0},{1}", ax, pbx)))
.Merge())
.Subscribe(o));
}
所以这只是使用内联 .Publish
来确保 b
在查询中是热的。外部 .Select
过滤已发布的 pb
observable 以匹配来自 a
的值(匹配 a
& b
),只需要一个原因我们只想要一个,然后在 threshold
时间执行 .Timeout
,如果达到超时,则执行 returns -1L
(long
)。内部 .Select
只是将两个 long
值变成一个 string
。此时查询是 IObservable<IObservable<string>>
,因此 .Merge
将其展平。
申请要求:
- 订阅两个事件流 A 和 B
- 对于每个 A 事件,一段时间后应该有相应的 B 事件
- 应用程序监视 A 事件并在没有对应的 B 到达(及时)时发出警报
- B 事件可以以不同于 A 事件的顺序到达(但仅在之后)、延迟或根本不到达
这在传统方法中很简单:
- 记录集合中的每个 A 事件
- 当相应的 B 事件到达时删除 A 事件
- 监控在超时内未收到 B 的 A 事件的集合,并生成警报
我想尝试一个反应式解决方案,但不知道哪个运营商最能表达这一点。我可视化事件流(Observables):
- A 的流
- B 流
- 也许还有一个计时器滴答流(除非计时运算符满足这个要求)
最终输出是一组警报:
someObservable.
...incantations, other observables...
.subscribe ((EventA a) -> raiseAlertForMissingB (a));
是否有优雅的响应式方法,或者这只是 不适合 rx?
(对原始问题的后续补充)
ascii 艺术弹珠图可能如下所示
A stream A1----A2----------A3------------------A4--------------A5---------
B stream ------------B1------------B3---------------B4------------B5-----
(A2 TIMEOUT)
merged ------------A1B1----------A3B3------A2??---A4B4----------A5B5---
订户从合并流中接收元组。如果元组是匹配的 A 和 B 事件,它会被记录下来,但如果元组是一个没有匹配 B 事件的 A 事件(在弹珠图中显示为 A2??),订阅者会发出警报。中提琴!
但是如何为每个A事件触发定时等待匹配的B事件?
(另加)
为了说明第4个要求点点部分"B events can arrive in a different order from the A events (but only after)..."
A stream A6----A7----------A8----------------------
B stream ------------B7------------B6-------B8-----
merged ------------A7B7----------A6B6-----A8B8---
2016 年 2 月 23 日更新
我正在测试移植到 Java 的建议解决方案。
解:谜
public class Test06Enigmativity {
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
private static final int[] bOrder = {
0, 1, 2,
4,
3, // out of order
6,
5, // out of order
7, 8,
10, 11, 12, 13, 14,
9, // out of order
15
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06Enigmativity app = new Test06Enigmativity ();
app.runEnigmativity ();
}
private void runEnigmativity () {
Observable<Long> aStream =
Observable.interval (A_PERIOD, TimeUnit.MILLISECONDS)
.doOnNext (seq -> {
output (" A%s", seq);
}).take (bOrder.length);
Observable<Long> bStream =
Observable.interval (B_PERIOD, TimeUnit.MILLISECONDS)
.map (seq -> {
long aId = (long) bOrder[seq.intValue ()];
output (" B%s", aId);
return aId;
})
.take (bOrder.length);
monitorEnigmativity (aStream, bStream, TIMEOUT)
.subscribe (this::output);
try {
Thread.sleep (60_000);
} catch (InterruptedException e) {
}
}
private Observable<String> monitorEnigmativity (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return Observable.create (subscriber ->
bStream.publish (pb ->
Observable.merge (
aStream.map (ax ->
pb
.filter (pbx -> pbx.equals (ax))
.take (1)
.timeout (thresholdMsec, TimeUnit.MILLISECONDS, Observable.defer (
() -> {
output (" timeout on B%s", ax);
return Observable.just (-1L);
}
)).map (pbx -> String.format ("%s,%s", ax, pbx))
)
)
).subscribe (subscriber::onNext)
);
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), System.currentTimeMillis () - startTime,
String.format (format, args));
}
}
这适用于:
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
tid: 12 511ms A0
tid: 15 626ms B0
tid: 15 626ms 0,0
tid: 12 907ms A1
tid: 15 1126ms B1
tid: 15 1126ms 1,1
tid: 12 1307ms A2
tid: 15 1626ms B2
tid: 15 1626ms 2,2
tid: 12 1707ms A3
tid: 12 2107ms A4
tid: 15 2126ms B4
tid: 15 2126ms 4,4
tid: 12 2507ms A5
tid: 15 2625ms B3
tid: 15 2625ms 3,3
tid: 12 2907ms A6
tid: 15 3126ms B6
tid: 15 3126ms 6,6
tid: 12 3307ms A7
tid: 15 3626ms B5
tid: 15 3626ms 5,5
tid: 12 3707ms A8
tid: 12 4107ms A9
tid: 15 4126ms B7
tid: 15 4126ms 7,7
tid: 12 4507ms A10
tid: 15 4626ms B8
tid: 15 4626ms 8,8
tid: 12 4908ms A11
tid: 15 5127ms B10
tid: 15 5128ms 10,10
tid: 12 5307ms A12
tid: 15 5626ms B11
tid: 15 5626ms 11,11
tid: 12 5707ms A13
tid: 12 6107ms A14
tid: 15 6126ms B12
tid: 15 6126ms 12,12
tid: 12 6507ms A15
tid: 15 6626ms B13
tid: 15 6626ms 13,13
tid: 13 7109ms timeout on B9
tid: 13 7114ms 9,-1
tid: 15 7126ms B14
tid: 15 7126ms 14,14
tid: 15 7625ms B9
tid: 15 8126ms B15
tid: 15 8126ms 15,15
但是随着 B 到达 A 的附近,事件可能会被错过:
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 410;
private static final int TIMEOUT = 2_000;
tid: 12 509ms A0
tid: 15 538ms B0
tid: 12 905ms A1
tid: 15 948ms B1
tid: 15 948ms 1,1
tid: 12 1305ms A2
tid: 15 1358ms B2
tid: 15 1358ms 2,2
tid: 12 1706ms A3
tid: 15 1768ms B4
tid: 12 2105ms A4
tid: 15 2178ms B3
tid: 15 2178ms 3,3
tid: 12 2505ms A5
tid: 16 2538ms timeout on B0
tid: 16 2544ms 0,-1
tid: 15 2588ms B6
tid: 12 2905ms A6
tid: 15 2998ms B5
tid: 15 2998ms 5,5
tid: 12 3305ms A7
tid: 15 3408ms B7
tid: 15 3408ms 7,7
tid: 12 3705ms A8
tid: 15 3817ms B8
tid: 15 3817ms 8,8
tid: 12 4105ms A9
tid: 14 4106ms timeout on B4
tid: 14 4106ms 4,-1
tid: 15 4228ms B10
tid: 12 4505ms A10
tid: 15 4638ms B11
tid: 12 4905ms A11
tid: 16 4906ms timeout on B6
tid: 16 4906ms 6,-1
tid: 15 5048ms B12
tid: 12 5305ms A12
tid: 15 5457ms B13
tid: 12 5705ms A13
tid: 15 5868ms B14
tid: 12 6106ms A14
tid: 13 6107ms timeout on B9
tid: 13 6107ms 9,-1
tid: 15 6279ms B9
tid: 14 6510ms timeout on B10
tid: 12 6510ms A15
tid: 14 6510ms 10,-1
tid: 15 6688ms B15
tid: 15 6688ms 15,15
B0 到达但产生超时而不是匹配。这是因为 B0 事件在 A 流的观察者更改为订阅 B 流之前到达。我认为对于热门的 B-stream,嵌套订阅的流行方法是有缺陷的。需要的是某种有限的最近重播流 - 一个主题的应用程序?
解法:Supertopi
package test;
import rx.Observable;
import java.util.concurrent.TimeUnit;
public class Test06Supertopi {
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 3_000;
private static final int[] bOrder = {
0, 1, 2,
4,
3, // out of order
6,
5, // out of order
7, 8,
10, 11, 12, 13, 14,
9, // out of order
15
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06Supertopi app = new Test06Supertopi ();
app.runSupertopi ();
}
private void runSupertopi () {
Observable<Long> aStream =
Observable.interval (A_PERIOD, TimeUnit.MILLISECONDS)
.doOnNext (seq -> {
output (" A%s", seq);
}).take (bOrder.length);
Observable<Long> bStream =
Observable.interval (B_PERIOD, TimeUnit.MILLISECONDS)
.map (seq -> {
long aId = (long) bOrder[seq.intValue ()];
output (" B%s", aId);
return aId;
})
.take (bOrder.length);
monitorSupertopi (aStream, bStream, TIMEOUT)
.subscribe (this::output);
try {
Thread.sleep (60_000);
} catch (InterruptedException e) {
}
}
private Observable<String> monitorSupertopi (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return Observable.create (subscriber -> {
Observable<Long> a = aStream.publish ().refCount ();
Observable<Long> b = bStream.publish ().refCount ();
a.subscribe ((Long aId) -> {
Observable.merge (
Observable.timer (thresholdMsec, TimeUnit.MILLISECONDS)
.doOnNext (x -> {
output (" timeout on B%s", aId);
})
.map (x -> String.format ("%s,%s", aId, -1L)),
b.filter ((Long j) -> j.equals (aId))
.map ((Long pbx) -> String.format ("%s,%s", aId, pbx))
).take (1)
.subscribe (subscriber::onNext);
});
});
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), System.currentTimeMillis () - startTime,
String.format (format, args));
}
}
工作正常:
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 500;
private static final int TIMEOUT = 4_000;
tid: 14 522ms A0
tid: 14 922ms A1
tid: 16 1054ms B0
tid: 16 1055ms 0,0
tid: 14 1322ms A2
tid: 16 1555ms B1
tid: 16 1555ms 1,1
tid: 14 1721ms A3
tid: 16 2055ms B2
tid: 16 2055ms 2,2
tid: 14 2122ms A4
tid: 14 2522ms A5
tid: 16 2555ms B4
tid: 16 2555ms 4,4
tid: 14 2922ms A6
tid: 16 3055ms B3
tid: 16 3055ms 3,3
tid: 14 3322ms A7
tid: 16 3555ms B6
tid: 16 3555ms 6,6
tid: 14 3722ms A8
tid: 16 4055ms B5
tid: 16 4055ms 5,5
tid: 14 4122ms A9
tid: 14 4522ms A10
tid: 16 4555ms B7
tid: 16 4555ms 7,7
tid: 14 4922ms A11
tid: 16 5055ms B8
tid: 16 5055ms 8,8
tid: 14 5322ms A12
tid: 16 5555ms B10
tid: 16 5556ms 10,10
tid: 14 5723ms A13
tid: 16 6056ms B11
tid: 16 6057ms 11,11
tid: 14 6122ms A14
tid: 14 6522ms A15
tid: 16 6555ms B12
tid: 16 6555ms 12,12
tid: 16 7055ms B13
tid: 16 7055ms 13,13
tid: 13 7125ms timeout on B9
tid: 13 7125ms 9,-1
tid: 16 7555ms B14
tid: 16 7555ms 14,14
tid: 16 8055ms B9
tid: 16 8555ms B15
tid: 16 8555ms 15,15
但是随着时间的推移,初始的 B 事件被接收了两次。我还在调查这个。
private static final long A_PERIOD = 400;
private static final long B_PERIOD = 410;
private static final int TIMEOUT = 2_000;
tid: 14 539ms A0
tid: 14 939ms A1
tid: 16 983ms B0
tid: 16 983ms 0,0
tid: 14 1339ms A2
tid: 16 1393ms B1
tid: 16 1393ms 1,1
tid: 14 1739ms A3
tid: 16 1803ms B2
tid: 16 1803ms 2,2
tid: 14 2139ms A4
tid: 16 2213ms B4
tid: 16 2213ms 4,4
tid: 14 2539ms A5
tid: 16 2623ms B3
tid: 16 2623ms 3,3
tid: 14 2939ms A6
tid: 16 3032ms B6
tid: 16 3032ms 6,6
tid: 14 3339ms A7
tid: 16 3443ms B5
tid: 16 3443ms 5,5
tid: 14 3739ms A8
tid: 16 3852ms B7
tid: 16 3852ms 7,7
tid: 14 4139ms A9
tid: 16 4263ms B8
tid: 16 4263ms 8,8
tid: 14 4539ms A10
tid: 16 4672ms B10
tid: 16 4672ms 10,10
tid: 14 4939ms A11
tid: 16 5083ms B11
tid: 16 5083ms 11,11
tid: 14 5339ms A12
tid: 16 5493ms B12
tid: 16 5493ms 12,12
tid: 14 5739ms A13
tid: 16 5903ms B13
tid: 16 5903ms 13,13
tid: 14 6139ms A14
tid: 13 6140ms timeout on B9
tid: 13 6140ms 9,-1
tid: 16 6313ms B14
tid: 16 6313ms 14,14
tid: 14 6539ms A15
tid: 14 6950ms B0
tid: 14 7360ms B1
tid: 14 7770ms B2
tid: 14 8180ms B4
tid: 13 8540ms timeout on B15
tid: 13 8540ms 15,-1
24/2/2016
解法:李坎贝尔
package test;
import rx.Observable;
import rx.subjects.PublishSubject;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Test06LeeCampbell {
private static final int TIMEOUT = 500;
class ScheduledEvent {
final String type;
final long aId;
final long atMsec;
volatile boolean expired;
public ScheduledEvent (long atMsec, String type, long aId) {
this.atMsec = atMsec;
this.type = type;
this.aId = aId;
}
}
ScheduledEvent[] scheduledEvents = {
new ScheduledEvent (10, "A", 0),
new ScheduledEvent (90, "B", 0),
new ScheduledEvent (110, "A", 1),
new ScheduledEvent (140, "B", 1),
new ScheduledEvent (190, "A", 2),
new ScheduledEvent (270, "B", 2),
new ScheduledEvent (310, "A", 3),
new ScheduledEvent (410, "A", 4),
new ScheduledEvent (440, "B", 4),
new ScheduledEvent (480, "B", 3),
new ScheduledEvent (510, "A", 5),
new ScheduledEvent (610, "A", 6),
//new ScheduledEvent (670, "B", 6),
new ScheduledEvent (710, "A", 7),
new ScheduledEvent (810, "A", 8),
new ScheduledEvent (860, "B", 7),
new ScheduledEvent (880, "B", 8),
new ScheduledEvent (910, "A", 9),
new ScheduledEvent (1100, "A", 10),
new ScheduledEvent (1110, "A", 11),
new ScheduledEvent (1120, "A", 12),
new ScheduledEvent (1130, "A", 13),
new ScheduledEvent (1140, "A", 14),
//new ScheduledEvent (1200, "B", 10),
//new ScheduledEvent (1210, "B", 11),
//new ScheduledEvent (1220, "B", 12),
new ScheduledEvent (1230, "B", 13),
new ScheduledEvent (1240, "B", 14),
new ScheduledEvent (1390, "B", 9),
new ScheduledEvent (1450, "A", 15),
new ScheduledEvent (3290, "B", 5),
new ScheduledEvent (3350, "B", 15)
};
private final long startTime = System.currentTimeMillis ();
public static void main (final String[] args) {
Test06LeeCampbell app = new Test06LeeCampbell ();
app.runLeeCampbell ();
}
private void runLeeCampbell () {
Observable<Long> aStream =
getCrudeSequencer ("A")
.doOnNext (seq -> {
output (" A%s", seq);
});
Observable<Long> bStreamCold =
getCrudeSequencer ("B")
.doOnNext (seq -> {
output (" B%s", seq);
});
PublishSubject<Long> bStream = PublishSubject.create ();
bStreamCold.subscribe (bStream);
monitorLeeCampbell (aStream, bStream, TIMEOUT)
.subscribe (this::output);
pause (10_000);
}
private Observable<String> monitorLeeCampbell (Observable<Long> aStream, Observable<Long> bStream, int thresholdMsec) {
return aStream.flatMap (a ->
bStream.filter (b -> b.equals (a))
.map (b -> String.format ("%s,%s", a, b))
.take (1)
.timeout (thresholdMsec, TimeUnit.MILLISECONDS)
.onErrorResumeNext (
throwable -> {
output (" timeout on B%s", a);
if (!(throwable instanceof TimeoutException)) {
throw new RuntimeException (throwable);
}
return Observable.just (String.format ("%s,%s", a, -1L));
}
)
);
}
private void output (String format, Object... args) {
System.out.printf ("tid:%3d %4dms %s%n", Thread.currentThread ().getId (), getElapsedMsec (),
String.format (format, args));
}
private long getElapsedMsec () {
return System.currentTimeMillis () - startTime;
}
private Observable<Long> getCrudeSequencer (String name) {
return Observable.create (subscriber ->
new Thread (() -> {
for (ScheduledEvent se : scheduledEvents) {
if (se.type.equals (name)) {
while (getElapsedMsec () < se.atMsec) {
pause (1);
}
subscriber.onNext (Long.valueOf (se.aId));
se.expired = true;
} else {
// Timing is not reliable for sequencing two threads
while (!se.expired) {
pause (1);
}
}
}
subscriber.onCompleted ();
}).start ()
);
}
private static void pause (final int millis) {
try {
Thread.sleep (millis);
} catch (InterruptedException e) {
}
}
}
有效
tid: 12 90ms A0
tid: 11 157ms B0
tid: 11 157ms 0,0
tid: 12 159ms A1
tid: 11 160ms B1
tid: 11 160ms 1,1
tid: 12 190ms A2
tid: 11 270ms B2
tid: 11 270ms 2,2
tid: 12 310ms A3
tid: 12 410ms A4
tid: 11 440ms B4
tid: 11 440ms 4,4
tid: 11 480ms B3
tid: 11 480ms 3,3
tid: 12 510ms A5
tid: 12 610ms A6
tid: 12 710ms A7
tid: 12 810ms A8
tid: 11 860ms B7
tid: 11 860ms 7,7
tid: 11 880ms B8
tid: 11 880ms 8,8
tid: 12 910ms A9
tid: 15 1011ms timeout on B5
tid: 15 1012ms 5,-1
tid: 12 1100ms A10
tid: 12 1110ms A11
tid: 16 1111ms timeout on B6
tid: 16 1111ms 6,-1
tid: 12 1120ms A12
tid: 12 1130ms A13
tid: 12 1140ms A14
tid: 11 1230ms B13
tid: 11 1230ms 13,13
tid: 11 1240ms B14
tid: 11 1240ms 14,14
tid: 11 1390ms B9
tid: 11 1390ms 9,9
tid: 12 1450ms A15
tid: 14 1601ms timeout on B10
tid: 14 1601ms 10,-1
tid: 15 1611ms timeout on B11
tid: 15 1611ms 11,-1
tid: 16 1621ms timeout on B12
tid: 16 1621ms 12,-1
tid: 19 1951ms timeout on B15
tid: 19 1951ms 15,-1
tid: 11 3290ms B5
tid: 11 3350ms B15
这应该有效。类型不同,我不想猜测您可能的抽象数据类型。您可以很容易地应用它们(函数参数、键比较和 Select
语句)
想法是,对于来自 a
的每个发出的值,我们采用 b.Where(keys match)
或超时(由 Observable.Timer
表示)发出的第一个值,并使我们的 Select
基于此信息。
我假设在超时情况下您还希望 OnNext
通知具有一些错误配置机制。
private IObservable<string> MonitorAB(IObservable<long> a, IObservable<long> b,
TimeSpan threshold)
{
return Observable.Create<string>((obs) =>
{
a = a.Publish().RefCount();
b = b.Publish().RefCount();
return a.Subscribe(i =>
{
Observable.Merge(Observable.Timer(threshold).Select(_ => $"Timeout for A{i}"),
b.Where(j => j == i).Select(_ => $"Got matching B for A{i}"))
.Take(1)
.Subscribe(obs.OnNext);
});
});
}
我是这样测试的
private void Test()
{
var a = Observable.Interval(TimeSpan.FromSeconds(2)).Take(5);
var b = Observable.Interval(TimeSpan.FromSeconds(5)).Take(5);
MonitorAB( a, b, TimeSpan.FromSeconds(13)).Subscribe(Console.WriteLine);
}
编辑: 要测试乱序情况,您可以翻转 B 流,例如
var b = Observable.Interval(TimeSpan.FromSeconds(2)).Select(i => 4 - i).Take(5);
假设:
- B流火爆,
- 您有一个可以表示匹配或不匹配的 return 类型(而不是使用将终止订阅的 OnError/Timeout)
那就这样就好了
AStream.SelectMany(a =>
BStream.Where(b => b == a)
.Select(b => new MatchMade(a, b))
.Take(1)
.Timeout(matchTimeout)
.Catch<TimeoutException>(ex=>Observable.Return(new NoMatchMade(a)))
)
我认为这可以满足您的需求,尽管它是 C# 而不是 Java - 我相信您可以轻松转换。
private IObservable<string> MonitorAB(
IObservable<long> a, IObservable<long> b, TimeSpan threshold)
{
return Observable.Create<string>(o =>
b.Publish(pb =>
a.Select(ax =>
pb.Where(pbx => pbx == ax)
.Take(1)
.Timeout(threshold, Observable.Return(-1L))
.Select(pbx => String.Format("{0},{1}", ax, pbx)))
.Merge())
.Subscribe(o));
}
所以这只是使用内联 .Publish
来确保 b
在查询中是热的。外部 .Select
过滤已发布的 pb
observable 以匹配来自 a
的值(匹配 a
& b
),只需要一个原因我们只想要一个,然后在 threshold
时间执行 .Timeout
,如果达到超时,则执行 returns -1L
(long
)。内部 .Select
只是将两个 long
值变成一个 string
。此时查询是 IObservable<IObservable<string>>
,因此 .Merge
将其展平。