在 RxJava 中连接两个大型数据集

Joining two large datasets in RxJava

我正在使用 RxJava 处理两个需要通过 ID 连接的大型数据集(数百万条记录)。这两个数据集不一定包含相同的记录。但它们是按 ID 排序的。

我发现 join 方法可用于此目的,下面的实验执行 "full join" 并按匹配的记录进行过滤。

  public class BatchTest
  {
     public static void main (String[] args)
     {
        Observable<Integer> myLeft    = Observable.just (1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        Observable<Integer> myRight   = Observable.just (1, 3, 5, 7, 9);

        myLeft.join (
           myRight,
           new Func1<Integer, Observable<Integer>>()
           {
              public Observable<Integer> call (Integer aT)
              {
                 return Observable.never ();
              }
           },
           new Func1<Integer, Observable<Integer>>()
           {
              public Observable<Integer> call (Integer aT)
              {
                 return Observable.never ();
              }
           },
           new Func2<Integer, Integer, Integer[]>()
           {
              public Integer[] call (Integer aT1, Integer aT2)
              {
                 return new Integer[] {aT1, aT2};
              }
           })
        .filter (new Func1<Integer[], Boolean> ()
        {
           public Boolean call (Integer[] aT)
           {
              return aT[0].equals (aT[1]);
           }
        })
        .subscribe (new Action1<Integer[]> ()
        {
           public void call (Integer[] aT)
           {
              System.out.printf ("%d, %d\n", aT[0], aT[1]);
           }
        });
     }
  }

这对于一小组示例来说效果很好,但对于大量示例来说效率很低。

所以我的问题是:看到集合是按键排序的,有没有办法可以使用这些 selector/windowing 函数来限制连接,这样我就不必连接 300 万条记录到 300 万条记录?

还是我做错了?

所以,基本上我要做的是实现一个自定义 Operator,它接受第二个 Observable 并在新线程上订阅它。自定义订阅者本质上是读入数据并将其粘贴到 BlockingQueue 中,然后从中提取数据并将其与原始 Observable.

中的数据合并

万一有人遇到同样的情况,这里是:

import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func2;

/**
 * This class is an operator which can be used to join two {@link Observable} streams,
 * by matching them up using a {@link Comparator}. The two streams need to be sorted
 * according to the rules of the {@link Comparator} for this to work.
 * <p>
 * If the main stream is empty this might never get invoked even if the right stream
 * has data.
 */
public class JoinByComparisonOperator<I, R> implements Observable.Operator<R, I>
{

   private final RightSubscriber<I> subscriberRight;

   private final Comparator<I> comparator;

   private final Func2<I, I, Observable<R>> resultSelector;

  /**
   * The constructor for this class.
   * <p>
   * @param aRight
   *     The observable that is joined to the "right"
   * @param aScheduler
   *     The scheduler used to run the "right" Observable as it always needs to
   *     run on a new thread.
   * @param aComparator
   *     The comparator used to compare two input values. This should follow the
   *     same rules by which the two input streams are sorted
   * @param aResultSelector
   *     Function that gets two matching results and can handle them accordingly.
   *     Note the inputs can be null in case there was no match.
   */
   public JoinByComparisonOperator(
      final Observable<I>              aRight,
      final Scheduler                  aScheduler,
      final Comparator<I>              aComparator,
      final Func2<I, I, Observable<R>> aResultSelector
   )
   {
      subscriberRight   = new RightSubscriber<> ();
      comparator        = aComparator;
      resultSelector    = aResultSelector;

      aRight
         .subscribeOn (aScheduler)
         .subscribe (subscriberRight);
   }

   /**
    * Creates a new subscriber that gets called and passes on any calls in turn.
    * 
    * @param aSubscriber
    * @return
    * <p>
    * @see rx.functions.Func1#call(java.lang.Object)
    */
   @Override
   public Subscriber<? super I> call (final Subscriber<? super R> aSubscriber)
   {
      return new LeftSubscriber (aSubscriber);
   }


   /**
    * The subscriber for the "left" stream, which is the main stream we are operating
    * on.
    */
   private class LeftSubscriber extends Subscriber<I>
   {

      final Subscriber<? super R> nextSubscriber;

      private I nextRight;

      public LeftSubscriber (final Subscriber<? super R> aNextSubscriber)
      {
         nextSubscriber = aNextSubscriber;
      }

      private void selectResultInternal (I aLeft, I aRight)
      {
         resultSelector.call (aLeft, aRight).subscribe (new Action1<R>()
         {
            public void call (R aInput)
            {
               nextSubscriber.onNext (aInput);
            }
         });
      }

      @Override
      public void onCompleted ()
      {
         if (!nextSubscriber.isUnsubscribed ())
         {
            while (!subscriberRight.isComplete () || nextRight != null)
            {
               try
               {
                  I myNext = null;

                  if (nextRight != null)
                  {
                     myNext = nextRight;
                     nextRight = null;
                  }
                  else
                  {
                     myNext = subscriberRight.takeNext ();
                  }

                  if (myNext != null)
                  {
                     selectResultInternal (null, myNext);
                  }
               }
               catch (InterruptedException myException)
               {
                  onError (myException);
               }
            }

            nextSubscriber.onCompleted ();
         }
      }

      @Override
      public void onError (Throwable aE)
      {
         if (!nextSubscriber.isUnsubscribed ())
         {
            nextSubscriber.onCompleted ();

            subscriberRight.unsubscribe ();
         }
      }

      @Override
      public void onNext (I aInput)
      {
         if (!nextSubscriber.isUnsubscribed ())
         {
            I myRight   = null;
            I myLeft    = aInput;

            if (subscriberRight.getError () != null)
            {
               nextSubscriber.onError (subscriberRight.getError ());
               unsubscribe ();
            }

            if (!subscriberRight.isComplete ())
            {
               int myComparison = 0;

               do {

                  if (nextRight == null)
                  {
                     try
                     {
                        nextRight = subscriberRight.takeNext ();
                     }
                     catch (InterruptedException myException)
                     {
                        onError (myException);
                        return;
                     }
                  }

                  if (nextRight != null)
                  {
                     myComparison   = Objects.compare (nextRight, aInput, comparator);

                     if (myComparison < 0)
                     {
                        selectResultInternal (null, nextRight);
                        nextRight   = null;
                     }
                     else if (myComparison == 0)
                     {
                        myRight     = nextRight;
                        nextRight   = null;
                     }
                  }

               } while (myComparison < 0);
            }

            selectResultInternal (myLeft, myRight);
         }
      }
   }

   /**
    * This class is intended to consume the "right" input stream and buffer the result
    * so it can be retrieved when processing the main stream.
    */
   private class RightSubscriber<T> extends Subscriber<T>
   {

      private boolean complete = false;

      private Throwable error = null;

      private BlockingQueue<T> buffer = new ArrayBlockingQueue <> (1000);

      @Override
      public void onCompleted ()
      {
         complete = true;
      }

      @Override
      public void onError (Throwable aE)
      {
         error = aE;
      }

      @Override
      public void onNext (T aT)
      {
         try {
            buffer.put (aT);
         }
         catch (InterruptedException myException) {
            error = myException;
         }
      }

      public T takeNext() throws InterruptedException
      {
         return buffer.poll (10, TimeUnit.SECONDS);
      }

      public boolean isComplete()
      {
         return complete && buffer.size () == 0;
      }

      public Throwable getError()
      {
         return error;
      }
   };
}

这里是一个使用示例,它获取每条 1000 万条记录的流并将它们匹配起来。

import java.util.Comparator;

import org.csi.domain.core.batch.JoinByComparisonOperator;

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

public class JoinTest
{
   public static void main (String[] args)
   {
      final Observable<Integer> myLeft    = Observable.range (1, 10000000);
      final Observable<Integer> myRight   = Observable.range (-100, 10000000);

      myLeft
         .lift (new JoinByComparisonOperator <Integer, Integer[]> (
            // The stream to be joined
            myRight,
            // The scheduler to use for the new stream
            Schedulers.newThread (),
            // The comparator to use to determine relative equality
            new Comparator<Integer>()
            {
               public int compare (Integer aArg0, Integer aArg1)
               {
                  return aArg0.compareTo (aArg1);
               }
            },
            // The function that combines matches found.
            new Func2<Integer, Integer, Observable<Integer[]>>()
            {
               public Observable<Integer[]> call (Integer aT1, Integer aT2)
               {
                  return Observable.just (new Integer[] {aT1, aT2});
               }
            }
         ))
         // The subscriber outputs the result to the console
         .subscribe (new Action1<Integer[]> ()
         {
            public void call (Integer[] aT)
            {
               System.out.printf ("%d, %d\n", aT[0], aT[1]);
            }
         });

   }
}