如何使用 rxpy/rxjs 延迟事件发射?
How to delay event emission with rxpy/rxjs?
我有两个事件流。一个来自电感回路,另一个是网络摄像机。汽车将驶过环路,然后撞上相机。如果事件彼此相差 N 毫秒以内(汽车总是首先进入循环),我想将它们组合起来,但我也希望每个流中的不匹配事件(硬件都可能失败)全部合并到一个流中。像这样:
---> (only unmatched a's, None)
/ \
stream_a (loop) \
\ \
--> (a, b) ---------------------------> (Maybe a, Maybe b)
/ /
stream_b (camera) /
\ /
--> (None, only unmatched b's)
现在我当然可以通过使用好的 ole Subject 反模式来解决问题:
unmatched_a = Subject()
def noop():
pass
pending_as = [[]]
def handle_unmatched(a):
if a in pending_as[0]:
pending_as[0].remove(a)
print("unmatched a!")
unmatched_a.on_next((a, None))
def handle_a(a):
pending_as[0].append(a)
t = threading.Timer(some_timeout, handle_unmatched)
t.start()
return a
def handle_b(b):
if len(pending_as[0]):
a = pending_as[0].pop(0)
return (a, b)
else:
print("unmatched b!")
return (None, b)
stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)
这不仅相当棘手,而且虽然我没有观察到它,但我很确定当我使用 threading.Timer
检查待处理队列时存在竞争条件。考虑到过多的 rx 运算符,我很确定它们的某些组合可以让您在不使用 Subject
的情况下执行此操作,但我无法弄清楚。如何做到这一点?
编辑
尽管出于组织和操作原因,我更愿意坚持使用 Python,但我将采用 JavaScript rxjs 答案并将其移植,甚至可能在节点中重写整个脚本。
您应该能够使用 auditTime
和 buffer
解决问题。像这样:
function matchWithinTime(a$, b$, N) {
const merged$ = Rx.Observable.merge(a$, b$);
// Use auditTime to compose a closing notifier for the buffer.
const audited$ = merged$.auditTime(N);
// Buffer emissions within an audit and filter out empty buffers.
return merged$
.buffer(audited$)
.filter(x => x.length > 0);
}
const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));
setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
如果 b
值紧跟 a
值并且您不希望它们匹配,您可以使用更具体的审核,例如:
const audited$ = merged$.audit(x => x === "a" ?
// If an `a` was received, audit upcoming values for `N` milliseconds.
Rx.Observable.timer(N) :
// If a `b` was received, don't audit the upcoming values.
Rx.Observable.of(0, Rx.Scheduler.asap)
);
我开发了一种与 Cartant 不同的策略,而且显然不够优雅,这可能会给您带来不同的结果。如果我没有理解问题或者我的回答没有用,我深表歉意。
我的策略基于在 a$ 上使用 switchMap
,然后在 b$ 上使用 bufferTime
。
此代码在每个 timeInterval
发出,它发出一个对象,其中包含接收到的最后一个 a 和一个 b[=46= 的数组]s 表示在时间间隔内收到的 bs。
a$.pipe(
switchMap(a => {
return b$.pipe(
bufferTime(timeInterval),
mergeMap(arrayOfB => of({a, arrayOfB})),
)
})
)
如果arrayOfB
为空,表示最后一个a不匹配。
如果arrayOfB
只有一个元素,则意味着最后一个a已经被b匹配了数组。
如果arrayOfB
有多个元素,则意味着最后一个a已经被第一个b[=46=匹配了] 的数组,而所有其他 bs 是不匹配的。
现在的问题是避免发射相同的 a
一次,这是代码变得有点混乱的地方。
总而言之,代码可能如下所示
const a$ = new Subject();
const b$ = new Subject();
setTimeout(() => a$.next("a1"), 0);
setTimeout(() => b$.next("b1"), 0);
setTimeout(() => a$.next("a2"), 100);
setTimeout(() => b$.next("b2"), 125);
setTimeout(() => a$.next("a3"), 200);
setTimeout(() => b$.next("b3"), 275);
setTimeout(() => a$.next("a4"), 400);
setTimeout(() => b$.next("b4"), 425);
setTimeout(() => b$.next("b4.1"), 435);
setTimeout(() => a$.next("a5"), 500);
setTimeout(() => b$.next("b5"), 575);
setTimeout(() => b$.next("b6"), 700);
setTimeout(() => b$.next("b6.1"), 701);
setTimeout(() => b$.next("b6.2"), 702);
setTimeout(() => a$.next("a6"), 800);
setTimeout(() => a$.complete(), 1000);
setTimeout(() => b$.complete(), 1000);
let currentA;
a$.pipe(
switchMap(a => {
currentA = a;
return b$.pipe(
bufferTime(50),
mergeMap(arrayOfB => {
let aVal = currentA ? currentA : null;
if (arrayOfB.length === 0) {
const ret = of({a: aVal, b: null})
currentA = null;
return ret;
}
if (arrayOfB.length === 1) {
const ret = of({a: aVal, b: arrayOfB[0]})
currentA = null;
return ret;
}
const ret = from(arrayOfB)
.pipe(
map((b, _indexB) => {
aVal = _indexB > 0 ? null : aVal;
return {a: aVal, b}
})
)
currentA = null;
return ret;
}),
filter(data => data.a !== null || data.b !== null)
)
})
)
.subscribe(console.log);
我有两个事件流。一个来自电感回路,另一个是网络摄像机。汽车将驶过环路,然后撞上相机。如果事件彼此相差 N 毫秒以内(汽车总是首先进入循环),我想将它们组合起来,但我也希望每个流中的不匹配事件(硬件都可能失败)全部合并到一个流中。像这样:
---> (only unmatched a's, None)
/ \
stream_a (loop) \
\ \
--> (a, b) ---------------------------> (Maybe a, Maybe b)
/ /
stream_b (camera) /
\ /
--> (None, only unmatched b's)
现在我当然可以通过使用好的 ole Subject 反模式来解决问题:
unmatched_a = Subject()
def noop():
pass
pending_as = [[]]
def handle_unmatched(a):
if a in pending_as[0]:
pending_as[0].remove(a)
print("unmatched a!")
unmatched_a.on_next((a, None))
def handle_a(a):
pending_as[0].append(a)
t = threading.Timer(some_timeout, handle_unmatched)
t.start()
return a
def handle_b(b):
if len(pending_as[0]):
a = pending_as[0].pop(0)
return (a, b)
else:
print("unmatched b!")
return (None, b)
stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)
这不仅相当棘手,而且虽然我没有观察到它,但我很确定当我使用 threading.Timer
检查待处理队列时存在竞争条件。考虑到过多的 rx 运算符,我很确定它们的某些组合可以让您在不使用 Subject
的情况下执行此操作,但我无法弄清楚。如何做到这一点?
编辑
尽管出于组织和操作原因,我更愿意坚持使用 Python,但我将采用 JavaScript rxjs 答案并将其移植,甚至可能在节点中重写整个脚本。
您应该能够使用 auditTime
和 buffer
解决问题。像这样:
function matchWithinTime(a$, b$, N) {
const merged$ = Rx.Observable.merge(a$, b$);
// Use auditTime to compose a closing notifier for the buffer.
const audited$ = merged$.auditTime(N);
// Buffer emissions within an audit and filter out empty buffers.
return merged$
.buffer(audited$)
.filter(x => x.length > 0);
}
const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));
setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
如果 b
值紧跟 a
值并且您不希望它们匹配,您可以使用更具体的审核,例如:
const audited$ = merged$.audit(x => x === "a" ?
// If an `a` was received, audit upcoming values for `N` milliseconds.
Rx.Observable.timer(N) :
// If a `b` was received, don't audit the upcoming values.
Rx.Observable.of(0, Rx.Scheduler.asap)
);
我开发了一种与 Cartant 不同的策略,而且显然不够优雅,这可能会给您带来不同的结果。如果我没有理解问题或者我的回答没有用,我深表歉意。
我的策略基于在 a$ 上使用 switchMap
,然后在 b$ 上使用 bufferTime
。
此代码在每个 timeInterval
发出,它发出一个对象,其中包含接收到的最后一个 a 和一个 b[=46= 的数组]s 表示在时间间隔内收到的 bs。
a$.pipe(
switchMap(a => {
return b$.pipe(
bufferTime(timeInterval),
mergeMap(arrayOfB => of({a, arrayOfB})),
)
})
)
如果arrayOfB
为空,表示最后一个a不匹配。
如果arrayOfB
只有一个元素,则意味着最后一个a已经被b匹配了数组。
如果arrayOfB
有多个元素,则意味着最后一个a已经被第一个b[=46=匹配了] 的数组,而所有其他 bs 是不匹配的。
现在的问题是避免发射相同的 a 一次,这是代码变得有点混乱的地方。
总而言之,代码可能如下所示
const a$ = new Subject();
const b$ = new Subject();
setTimeout(() => a$.next("a1"), 0);
setTimeout(() => b$.next("b1"), 0);
setTimeout(() => a$.next("a2"), 100);
setTimeout(() => b$.next("b2"), 125);
setTimeout(() => a$.next("a3"), 200);
setTimeout(() => b$.next("b3"), 275);
setTimeout(() => a$.next("a4"), 400);
setTimeout(() => b$.next("b4"), 425);
setTimeout(() => b$.next("b4.1"), 435);
setTimeout(() => a$.next("a5"), 500);
setTimeout(() => b$.next("b5"), 575);
setTimeout(() => b$.next("b6"), 700);
setTimeout(() => b$.next("b6.1"), 701);
setTimeout(() => b$.next("b6.2"), 702);
setTimeout(() => a$.next("a6"), 800);
setTimeout(() => a$.complete(), 1000);
setTimeout(() => b$.complete(), 1000);
let currentA;
a$.pipe(
switchMap(a => {
currentA = a;
return b$.pipe(
bufferTime(50),
mergeMap(arrayOfB => {
let aVal = currentA ? currentA : null;
if (arrayOfB.length === 0) {
const ret = of({a: aVal, b: null})
currentA = null;
return ret;
}
if (arrayOfB.length === 1) {
const ret = of({a: aVal, b: arrayOfB[0]})
currentA = null;
return ret;
}
const ret = from(arrayOfB)
.pipe(
map((b, _indexB) => {
aVal = _indexB > 0 ? null : aVal;
return {a: aVal, b}
})
)
currentA = null;
return ret;
}),
filter(data => data.a !== null || data.b !== null)
)
})
)
.subscribe(console.log);