使用比较器功能根据其内容缓冲元素

Buffer elements based on its contents with comparer function

在 RSJS 中如何缓冲值以便当下一个元素与前一个元素不同时缓冲区将被刷新。如果某个比较器的元素相同,那么它应该缓冲它们直到检测到下一个变化...

假设我有这样的元素...

{ t: 10, price:12 },
{ t: 10, price:13 },
{ t: 10, price:14 },
{ t: 11, price:12 },
{ t: 11, price:13 },
{ t: 10, price:14 },
{ t: 10, price:15 },

如果 t 属性 值与前一个元素 t 值相同,则元素相同,所以在输出时我只需要这样的缓冲区...

[ { t: 10, price:12 }, { t: 10, price:13}, { t: 10, price:14} ],
[ { t: 11, price:12}, { t: 11, price:13} ],
[ { t: 10, price:14 }, { t: 10, price:15 } ]

所以在结果中我发出了两个元素(两个缓冲区,每个缓冲区包含相同的对象)。

我试图使用 bufferWhen 或只是 buffer 但我不知道如何在这种情况下指定 closingNotifier 因为这需要依赖于正在接近的元素.有人可以帮忙吗?

TLDR;

const items = [
  { t: 10, price: 12 },
  { t: 10, price: 13 },
  { t: 10, price: 14 },
  { t: 11, price: 12 },
  { t: 11, price: 13 },
  { t: 10, price: 14 },
  { t: 10, price: 15 }
];

const src$ = from(items).pipe(
  delay(0),
  share()
);

const closingNotifier$ = src$.pipe(
  distinctUntilKeyChanged('t'),
  skip(1),
  share({ resetOnRefCountZero: false })
);

src$.pipe(bufferWhen(() => closingNotifier$)).subscribe(console.log);

StackBlitz demo.

详细解释

棘手的部分是确定 closingNotifier,因为正如您所说,它取决于来自流的值。我的第一个想法是 src$ 必须扮演 2 个不同的角色:1) 发出值的流和 2) 缓冲区运算符的 closingNotifier。这就是使用 share() 运算符的原因:

const src$ = from(items).pipe(
  delay(0),
  share()
);

delay(0) 也被使用,因为源的项目是同步发射的。由于源将被订阅两次(因为源是流,也是 closingNotifier),所以两个订阅者都接收值很重要。如果 delay(0) 被省略,只有第​​一个订阅者会收到物品,而第二个订阅者什么也不会收到,因为它是在源的所有物品发出后注册的。使用 delay(0) 我们只确保两个订阅者(来自 subscribe 回调的第一个订阅者和第二个订阅者是 closingNotifier 内部订阅者 )是在源发出值之前注册。

进入closingNotifier

const closingNotifier$ = src$.pipe(
  distinctUntilKeyChanged('t'),
  skip(1),
  share({ resetOnRefCountZero: false })
);
    使用
  • distinctUntilKeyChanged('t'), 是因为缓冲区应发出累积项的 信号 是当具有不同 t 值的项来自流时.
  • 使用
  • skip(1) 是因为当第一个值来自流时,在第一次订阅 closingNotifier 之后,它会导致缓冲的项目立即发送,这不是什么我们要,因为是第一批货。
  • share({ resetOnRefCountZero: false }) - 这是有趣的部分;如您所见,我们使用 bufferWhen(() => closingNotifier$) 而不是 buffer(closingNotifier$);那是因为 buffer first subscribes to the source, and then to the notifier; this complicates the situation a bit so I decided to go with bufferWhen, which subscribes to the notifier first 再到源头; bufferWhen 问题 是它每次发出后都会重新订阅到 closingNotifier,因此我们需要使用 share,因为我们当已经有一些项目时,不想重复第一批项目(skip 运算符)的逻辑; share()(没有 resetOnRefCountZero 选项)的问题是它每次发出后仍会重新订阅,因为这是使用 inner Subject 时的默认行为by share 没有订阅者;这可以通过使用 resetOnRefCountZero: false 来解决,当第一个订阅者注册时,在内部主题之前没有订阅者之后,它不会重新订阅源;