在 ReactFX 中组合多个更改流

Combining multiple change streams in ReactFX

问题

如何在 ReactFX 中正确组合多个 属性 更改流以用于 UndoFX(或任何用例)?

详情

这里是我想要完成的事情的简短解释(完整的示例代码是 posted at GitHub):

有一个具有两个属性的示例模型。为了简单起见,它们是双重属性

public class DataModel {
    private DoubleProperty a, b;
    //...
    //with appropriate getters, setters, equals, hashcode
    //...
}

在示例代码中,有一些按钮可以更改一个或两个属性。如果这是更改,我想撤消对两者的更改。

根据 UndoFX 示例,每个继承自基础 class:

的更改 classes(此处也缩写)
public abstract class ChangeBase<T> implements UndoChange {
   protected final T oldValue, newValue;
   protected final DataModel model;

   protected ChangeBase(DataModel model, T oldValue, T newValue) {
      this.model = model;
      this.oldValue = oldValue;
      this.newValue = newValue;
   }

   public abstract ChangeBase<T> invert();
   public abstract void redo();

   public Optional<ChangeBase<?>> mergeWith(ChangeBase<?> other) {
      return Optional.empty();
   }

   @Override
   public int hashCode() {
      return Objects.hash(this.oldValue, this.newValue);
   }      

   @Override
   public boolean equals(Object obj) {
      if (this == obj) {
         return true;
      }
      if (obj == null) {
         return false;
      }
      if (getClass() != obj.getClass()) {
         return false;
      }
      final ChangeBase<?> other = (ChangeBase<?>) obj;
      if (!Objects.equals(this.oldValue, other.oldValue)) {
         return false;
      }
      if (!Objects.equals(this.newValue, other.newValue)) {
         return false;
      }
      if (!Objects.equals(this.model, other.model)) {
         return false;
      }
      return true;
   }
}

public class ChangeA extends ChangeBase<Double> {
   //...
   //constructors and other method implementations
   //..
   @Override
   public void redo() {
      System.out.println("ChangeA redo "+this);
      this.model.setA(this.newValue);
   }   
}

public class ChangeB extends ChangeBase<Double> {
   //...
   //constructors and other method implementations
   //...
   @Override
   public void redo() {
      System.out.println("ChangeA redo "+this);
      this.model.setB(this.newValue);
   }   
}

所有更改都实现一个接口

public interface UndoChange {
   public void redo();
   public UndoChange invert();
   public Optional<UndoChange> mergeWith(UndoChange other);

}

阅读完文档后,我开始创建事件流以捕获对每个 属性:

的更改
EventStream<UndoChange> changeAStream = 
    EventStreams.changesOf(model.aProperty())
              .map(c -> new ChangeA(model, (Change<Number>)c));
EventStream<UndoChange> changeBStream = 
    EventStreams.changesOf(model.bProperty())
              .map(c -> new ChangeB(model, (Change<Number>)c));

我的第一次尝试是像这样合并流

EventStream<UndoChange> bothStream = EventStreams.merge(changeAStream, changeBStream); 

在这种情况下发生的情况是,如果同时更改 A 和 B 属性,流中将有两个更改,每个更改将单独撤消,而不是一起撤消。每次调用 setter 都会在适当的流中进行更改,然后将其发送到 bothStream,然后包含两个单独的事件而不是一个。

阅读更多内容后,我尝试将流和映射合并到一个单独的更改对象中:

 EventStream<UndoChange> bothStream = EventStreams.combine(changeAStream, changeBStream).map(ChangeBoth::new); 

其中 ChangeBoth 定义为:

public class ChangeBoth implements UndoChange {

   private final ChangeA aChange;
   private final ChangeB bChange;

   public ChangeBoth(ChangeA ac, ChangeB bc) {
      this.aChange = ac;
      this.bChange = bc;
   }

   public ChangeBoth(Tuple2<UndoChange, UndoChange> tuple) {
      this.aChange = ((ChangeBoth)tuple.get1()).aChange;
      this.bChange = ((ChangeBoth)tuple.get2()).bChange;
   }

   @Override
   public UndoChange invert() {
      System.out.println("ChangeBoth invert "+this);
      return new ChangeBoth(new ChangeA(this.aChange.model, this.aChange.newValue, this.aChange.oldValue), 
                            new ChangeB(this.bChange.model, this.bChange.newValue, this.bChange.oldValue));
   }

   @Override
   public void redo() {
      System.out.println("ChangeBoth redo "+this);
      DataModel model = this.aChange.model;
      model.setA(this.aChange.newValue);
      model.setB(this.bChange.newValue);
   }

   //...
   // plus appropriate mergeWith, hashcode, equals
   //...
}

这导致 IllegalStateException: Unexpected change received 被抛出。经过一些挖掘,我确定了为什么会这样:当 ChangeBoth 被撤消时(通过 invert()redo() 调用),它将每个 属性 设置回旧值.但是,当它设置每个 属性 时,更改会通过流发回,从而导致在将两个属性设置回旧值之间将新的 ChangeBoth 放入流中。

总结

所以回到我的问题:执行此操作的正确方法是什么?有没有一种方法可以将两个属性的更改流合并为一个不会导致此问题的更改对象?

编辑 - 尝试 1

根据 Tomas 的回答,我 added/changed 以下代码(注意:repo 中的代码已更新):

changeAStreamchangeBstream 保持不变。

我没有合并流,而是按照 Tomas 的建议创建了一个二元运算符以将两个更改减少为一个:

BinaryOperator<UndoChange> abOp = (c1, c2) -> {
   ChangeA ca = null;
   if(c1 instanceof ChangeA) {
      ca = (ChangeA)c1;
   }
   ChangeB cb = null;
   if(c2 instanceof ChangeB) {
      cb = (ChangeB)c2;
   }
   return new ChangeBoth(ca, cb);
};

并将事件流更改为

SuspendableEventStream<UndoChange> bothStream = EventStreams.merge(changeAStream, changeBStream).reducible(abOp);

现在按钮操作未在 setonAction 中实现,而是通过事件流处理

  EventStreams.eventsOf(bothButton, ActionEvent.ACTION)
          .suspenderOf(bothStream)
          .subscribe((ActionEvent event) ->{
             System.out.println("stream action");
              model.setA(Math.random()*10.0);
              model.setB(Math.random()*10.0);                 
          });

这对于适当地组合事件非常有用,但对于 A+B 更改,撤消仍然会中断。它适用于单独的 A 和 B 更改。这是两个 A+B 更改然后撤消

的示例
A+B Button Action in event stream
Change in A stream
Change in B stream
A+B Button Action in event stream
Change in A stream
Change in B stream
ChangeBoth attempting merge with combinedeventstreamtest.ChangeBoth@775ec8e8... merged
undo 6.897901340713284  2.853416510829745
ChangeBoth invert combinedeventstreamtest.ChangeBoth@aae83334
ChangeA invert combinedeventstreamtest.ChangeA@32ee049a
ChangeB invert combinedeventstreamtest.ChangeB@4919dd13
ChangeBoth redo combinedeventstreamtest.ChangeBoth@b2155b1e
Change in A stream
Exception in thread "JavaFX Application Thread" java.lang.IllegalArgumentException: Unexpected change received.
Expected:
combinedeventstreamtest.ChangeBoth@b2155b1e
Received:
combinedeventstreamtest.ChangeA@2ad21e08
Change in B stream

编辑 - 尝试 2 - 成功!!

Tomas 非常友好地指出了解决方案(我应该已经意识到了)。在做 redo():

时暂停
  UndoManager<UndoChange> um = UndoManagerFactory.unlimitedHistoryUndoManager(
          bothStream, 
          c -> c.invert(), 
          c -> bothStream.suspendWhile(c::redo),
          (c1, c2) -> c1.mergeWith(c2)
  );

因此,任务是将 bothStream 在处理按钮单击期间发出的更改合并为一个。

您需要一个函数将两个 UndoChange 缩减为一个:

BinaryOperator<UndoChange> reduction = ???; // implement as appropriate

使 bothStream 减少更改为一个,同时 "suspended":

SuspendableEventStream<UndoChange> bothStream =
        EventStreams.merge(changeAStream, changeBStream).reducible(reduction);

现在您只需要在处理按钮点击时暂停 bothStream。可以这样做:

EventStreams.eventsOf(bothButton, ActionEvent.ACTION) // Observe actions of bothButton ...
        .suspenderOf(bothStream)                      // but suspend bothStream ...
        .subscribe((ActionEvent event) -> {           // before handling the action.
            model.setA(Math.random()*10.0);
            model.setB(Math.random()*10.0);
        })

当undoing/redoing撤消管理器中的更改时,也暂停bothStream,以便精确反转(组合)更改从[发出=14=] 撤消组合更改时(这是使 UndoManager 高兴所必需的)。这可以通过将 apply 参数包装到 bothStream.suspendWhile() 中的 UndoManager 构造函数来完成,例如:

UndoManagerFactory.unlimitedHistoryUndoManager(
        bothStream, 
        c -> c.invert(), 
        c -> bothStream.suspendWhile(c::redo)  // suspend while applying the change
)