使用 RxJava 实现类似十字转门的运算符

Implementing a turnstile-like operator with RxJava

我需要帮助在 RxJava (RxScala) 中实现类似十字转门的运算符。我花了很多时间思考它,但我似乎被卡住了。

函数的类型应该如下:

def turnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T]

这个想法是操作员的行为应该与真正的十字转门非常相似。有人来了(queue),还有一个turnstile准备接受新的单身人(true元素在闸机,你可以想象成一个令牌插入闸机),或者关闭(闸机中false,取消之前的令牌)。旋转门中每 true 个元素,只有一个人可以通过。

此外,在没有人通过的情况下,连续插入多个令牌(十字转门中的几个true项)与仅插入一个令牌相同,十字转门不计算令牌。

换句话说,闸机最初是关闭的。当其中出现 true 元素时,它会为一个人打开。如果有人出现,它会通过(到输出端),然后旋转门再次关闭。如果 false 元素出现在旋转门中,旋转门也会关闭。

queue       ----A---B-------------C--D--
turnstile   --T--------T--T-T-T-T------T
            ============================
output      ----A------B----------C----D

一个弹珠图,显示打开的旋转门等待 A 人,然后 B 等待旋转门打开,然后几个令牌表现为一个人 C 通过,但人 D 必须等待又是一个新令牌

----A----B--
--T---T-F-T-
============
----A-----B-

一张弹珠图,显示旋转门中的 false 元素如何再次关闭旋转门。

感谢任何帮助。我认为在不编写自定义运算符的情况下实现这一点的唯一方法是以某种方式使用 zip 运算符,因为它可能是唯一使一个序列中的元素等待另一个序列中的元素的运算符(或者是否有任何其他运算符我不知道?)。但我需要压缩一些旋转栅门元素,具体取决于它们是否与人配对...

我认为这是一个有趣的问题,我很好奇有什么好的解决方案。

好的,我找到了一个解决方案,灵感来自 Dave Sexton 的评论。最后我没有使用 zip 因为我想不出一个解决方案。

我基本上将旋转门实现为具有三个状态变量的状态机:是否锁定,等待通过旋转门的元素队列,以及最后一个通过旋转门的元素(这些是收集的最终产生实际输出)。

状态机的输入是转换请求流,它由两个输入流合并而成:lock/unlock 请求流和要通过十字转门的元素流。我只是用 scan 处理转换,然后 collect 从结果状态传递的元素。

/** sample elements from queue through turnstile, one at a time
*
* @param queue source of elements to pass through the turnstile.
* @param turnstile For every `true` in the turnstile pass one element through from the queue
* @tparam T type of the elements
* @return the source of queue elements passing through the turnstile
*/
def queueThroughTurnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T] = {
  import scala.collection.immutable.Queue

  case class State(isOpen: Boolean, elementsInQueue: Queue[T], maybeLastEmittedElement: Option[T])
  sealed abstract class Transition
  case object Lock extends Transition
  case object Unlock extends Transition
  case class Element(element: T) extends Transition

  val initialState = State(isOpen = false, Queue.empty, None)

  queue.map(element ⇒ Element(element))
    .merge(turnstile map (unlock ⇒ if (unlock) Unlock else Lock))
    .scan(initialState) { case (State(isOpen, elementsInQueue, _), transition) ⇒ transition match {
    case Lock ⇒ State(isOpen = false, elementsInQueue, None)
    case Unlock ⇒ {
      if (elementsInQueue.isEmpty)
        State(isOpen = true, elementsInQueue, None)
      else {
        val (firstElement, newQueue) = elementsInQueue.dequeue
        State(isOpen = false, newQueue, Some(firstElement))
      }
    }
    case Element(newElement) ⇒ {
      if (isOpen) {
        if (elementsInQueue.isEmpty)
          State(isOpen = false, Queue.empty, Some(newElement))
        else {
          val (firstElement, newQueue) = elementsInQueue.dequeue
          State(isOpen = false, newQueue enqueue newElement, Some(firstElement))
        }  
      } else {
        State(isOpen = false, elementsInQueue enqueue newElement, None)
      }
    }
  }
  }.collect { case State(_, _, Some(lastEmittedElement)) ⇒ lastEmittedElement}
}

所以我认为我有一个更干净、完全 Rx 的解决方案。这实际上是一个非常有趣的问题。如果它能满足您的需求,我认为它最终会非常优雅,尽管它花了很长时间才完成。

遗憾的是我不懂 Scala,所以您将不得不处理我的 Java8 lambda。 :D

整个实现:

public static Observable<String> getTurnstile(final Observable<String> queue, final Observable<Boolean> tokens) {
    return queue.publish(sharedQueue ->
            tokens.switchMap(token -> token ? sharedQueue.limit(1) : Observable.empty()));
}

所以,这里发生的事情是我们使用 publish 来创建我们可以多次订阅的人员队列的共享可观察对象。在其中,我们在令牌流上使用 switchMap,这意味着任何时候从 switchMap 发出一个新的 Observable,它都会丢弃最后一个并订阅新的。只要标记为真,它就会对人员队列进行新订阅(并且连续多个真都可以,因为它会取消旧订阅)。当它为 false 时,它​​只是转储出一个空的 Observable,以免浪费时间。

还有一些(通过的)测试用例:

@RunWith(JUnit4.class)
public class TurnstileTest {
    private final TestScheduler scheduler = new TestScheduler();
    private final TestSubscriber<String> output = new TestSubscriber<>();

    private final TestSubject<Boolean> tokens = TestSubject.create(scheduler);
    private final TestSubject<String> queue = TestSubject.create(scheduler);

    @Before
    public void setup() {
        Turnstile.getTurnstile(queue, tokens).subscribe(output);
    }

    @Test
    public void allowsOneWithTokenBefore() {
        tokens.onNext(true, 0);
        queue.onNext("Bill", 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void tokenBeforeIsCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(false, 1);
        queue.onNext("Bill", 2);

        assertNonePassed();
    }

    @Test
    public void tokensBeforeAreCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        tokens.onNext(false, 3);
        queue.onNext("Bill", 4);

        assertNonePassed();
    }

    @Test
    public void eventualPassThroughAfterFalseTokens() {
        tokens.onNext(false, 0);
        queue.onNext("Bill", 1);
        tokens.onNext(false, 2);
        tokens.onNext(false, 3);
        queue.onNext("Jane", 4);
        queue.onNext("Bob", 5);
        tokens.onNext(true, 6);
        tokens.onNext(true, 7);
        tokens.onNext(false, 8);
        tokens.onNext(false, 9);
        queue.onNext("Phil", 10);
        tokens.onNext(false, 11);
        tokens.onNext(false, 12);
        tokens.onNext(true, 13);

        assertPassedThrough("Bill", "Jane", "Bob");
    }

    @Test
    public void allowsOneWithTokenAfter() {
        queue.onNext("Bill", 0);
        tokens.onNext(true, 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void multipleTokenEntriesBeforeOnlyAllowsOneAtATime() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        queue.onNext("Bill", 3);
        tokens.onNext(true, 4);
        tokens.onNext(true, 5);
        queue.onNext("Jane", 6);
        queue.onNext("John", 7);

        assertPassedThrough("Bill", "Jane");
    }

    @Test
    public void noneShallPassWithoutToken() {
        queue.onNext("Jane", 0);
        queue.onNext("John", 1);

        assertNonePassed();
    }

    private void closeSubjects() {
        scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
        scheduler.triggerActions();
        tokens.onCompleted();
        queue.onCompleted();
        scheduler.triggerActions();
    }

    private void assertNonePassed() {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList());
    }

    private void assertPassedThrough(final String... names) {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList(names));
    }
}

如果您发现任何不适用于此的边缘情况,请告诉我,特别是如果它实时出现问题,因为测试显然是在受控环境中进行的。