JavaFX 和 RxJava- TableView 无限调用 setCellValueFactory()

JavaFX and RxJava- TableView infinitely calling setCellValueFactory()

我 运行 遇到了 TableView 的问题,并且使用了 ReactFX for the setCellValueFactory. The EventStream driving the bindings originates from an RxJava Observable 的反应式绑定,但是,在下面的代码中,您会找到一个方法将其转换为 EventStream

但是,TableView 除了列的初始绑定值外,什么都不显示。当我将 System.out.println 添加到 setCellValueFactory() 主体时,我发现 setCellValueFactory() 在循环中被无限调用,并且发出的值从未进入绑定。

对此我真的很纳闷。我如何停止这种行为并让 Observable 成功地将单个值发送到 EventStream,然后发送到 Binding?

这是我的 SSCCE。

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable(buildSampleData()));

        stage.setScene(scene);
        stage.show();
    }

    private ObservableList<ReactivePoint> buildSampleData() { 
        ObservableList<ReactivePoint> points = FXCollections.observableArrayList();
        points.add(new ReactivePoint(Observable.just(1), Observable.just(2)));
        return points;
    }

    private static final class ReactivePoint {
        private final Observable<Integer> x;
        private final Observable<Integer> y;

        ReactivePoint(Observable<Integer> x, Observable<Integer> y) { 
            this.x = x;
            this.y = y;
        }
        public Observable<Integer> getX() {
            return x;
        }
        public Observable<Integer> getY() { 
            return y;
        }
    }

    private static final class ReactiveTable extends TableView<ReactivePoint> {

        @SuppressWarnings("unchecked")
        private ReactiveTable(ObservableList<ReactivePoint> reactivePoints) { 
            this.setItems(reactivePoints);

            TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
            xCol.setCellValueFactory(cb -> {
                System.out.println("Calling cell value factory for x col");
                return toReactFX(cb.getValue().getX().map(x -> (Number) x)).toBinding(-1); //causes infinite call loop
                //return new SimpleObjectProperty<Number>(1); //works fine
            });

            TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
            yCol.setCellValueFactory(cb -> {
                System.out.println("Calling cell value factory for y col");
                return toReactFX(cb.getValue().getY().map(y -> (Number) y)).toBinding(-1); //causes infinite call loop
                //return new SimpleObjectProperty<Number>(1); //works fine
            });

            this.getColumns().addAll(xCol, yCol);
        }
    }
    private static <T> EventStream<T> toReactFX(Observable<T> obs) {
        EventSource<T> es = new EventSource<>();
        obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)), e -> e.printStackTrace());
        return es;
    }
    public static void main(String[] args) { 
        launch(args);
    }
}

更新

我想我在下面提出的解决方案中发现了一个问题。如果在除平台线程之外的其他线程上发出任何 Observable,则不会向 属性 填充任何值。

我试图通过在将调用 rxToProperty 的线程放入平台线程之前检查它是否是平台线程来解决这个问题,但这没有用并再次导致无限循环。我不知道 属性 的线程安全是否会出轨。

但是我怎样才能让 Observable 在多个线程上发出来安全地填充 Property?这是我更新的 SSCCE 显示此行为。 "X" 列永远不会填充,因为它是多线程的,但 "Y" 列会填充,因为它保留在平台线程上。

public class ReactiveTableViewTest extends Application {

    @Override
    public void start(Stage stage) throws Exception {

        Group root = new Group();
        Scene scene = new Scene(root);

        root.getChildren().add(new ReactiveTable(buildSampleData()));

        stage.setScene(scene);
        stage.show();
    }

    private ObservableList<ReactivePoint> buildSampleData() { 
        ObservableList<ReactivePoint> points = FXCollections.observableArrayList();
        points.add(new ReactivePoint(
                Observable.just(1, 5, 6, 8,2,3,5,2).observeOn(Schedulers.computation()), 
                Observable.just(2,6,8,2,14)
                )
            );
        return points;
    }

    private static final class ReactivePoint {
        private final Observable<Integer> x;
        private final Observable<Integer> y;

        ReactivePoint(Observable<Integer> x, Observable<Integer> y) { 
            this.x = x;
            this.y = y;
        }
        public Observable<Integer> getX() {
            return x;
        }
        public Observable<Integer> getY() { 
            return y;
        }
    }

    private static final class ReactiveTable extends TableView<ReactivePoint> {

        @SuppressWarnings("unchecked")
        private ReactiveTable(ObservableList<ReactivePoint> reactivePoints) { 
            this.setItems(reactivePoints);


            System.out.println("Constructor is happening on FX THREAD: " + Platform.isFxApplicationThread());

            TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
            xCol.setCellValueFactory(cb -> { 
                System.out.println("CellValueFactory for X called on FX THREAD: " + Platform.isFxApplicationThread());
                return rxToProperty(cb.getValue().getX().map(x -> (Number) x));
                }
            );

            TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
            yCol.setCellValueFactory(cb -> {
                System.out.println("CellValueFactory for Y called on FX THREAD: " + Platform.isFxApplicationThread());
                return rxToProperty(cb.getValue().getY().map(y -> (Number) y));
            }
        );

            this.getColumns().addAll(xCol, yCol);
        }
    }
    private static <T> ObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ObjectProperty<T> property = new SimpleObjectProperty<>();

        obs.subscribe(v -> { 
            if (Platform.isFxApplicationThread()) {
                System.out.println("Emitting " + v + " on FX Thread");
                property.set(v);
            }
            else { 
                System.out.println("Emitting " + v + " on Non-FX Thread");
                Platform.runLater(() -> property.set(v));
            }
        });

        return property;
    }
    public static void main(String[] args) { 
        launch(args);
    }
}

虽然我还没有完全弄清楚OP代码中问题的确切根源,但我找到了解决方案(如果有人能解释为什么,我会把它标记为答案)。我最初认为 Platform.runLater() 可能会导致最初设置的无限循环(如下所示)。

private static <T> EventStream<T> toReactFX(Observable<T> obs) {
    EventSource<T> es = new EventSource<>();
    obs.subscribe(foo -> Platform.runLater(() -> es.push(foo)), e -> e.printStackTrace());
    return es;
}

事实证明这个理论是正确的。删除 Platform.runLater() 导致无限循环消失。也许发出的值一直将自己扔到 GUI 线程的后面,因此永远不会进入绑定?但是仍然没有发出任何东西,table 值保持在 -1,即初始 Binding 值。

private static <T> EventStream<T> toReactFX(Observable<T> obs) {
    EventSource<T> es = new EventSource<>();
    obs.subscribe(foo -> es.push(foo), e -> e.printStackTrace());
    return es;
}

我确实找到了一些有用的东西。我创建了一个名为 rxToProperty() 的新转换方法,并用它替换了对 rxToReactFX() 的调用。在那之后,一切似乎都很好。

private static <T> ObjectProperty<T> rxToProperty(Observable<T> obs) { 
    ObjectProperty<T> property = new SimpleObjectProperty<>();
    obs.subscribe(v -> property.set(v));
    return property;
}

这是新的 TableColumn 设置。

TableColumn<ReactivePoint,Number> xCol = new TableColumn<>("X");
xCol.setCellValueFactory(cb -> rxToProperty(cb.getValue().getX().map(x -> (Number) x)));

TableColumn<ReactivePoint,Number> yCol = new TableColumn<>("Y");
yCol.setCellValueFactory(cb -> rxToProperty(cb.getValue().getY().map(y -> (Number) y)));

如果有人有更好的解决方案,或者可以解释为什么 Platform.runLater()EventStream 不起作用,我会将其标记为已接受的答案。

更新

在非 FX 线程上发出的 Observables 有一些问题,并且值永远不会填充到 Property。我发现这可以通过使用 cache() 来保存最后一个值来解决,并且它会在订阅的调用线程上 re=emit,这将是 FX 线程。我还对返回的 Property.

进行了一些同步和只读包装
private static <T> ReadOnlyObjectProperty<T> rxToProperty(Observable<T> obs) { 
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();

        obs.cache(1).subscribe(v -> { 
            synchronized(property) { 
                if (Platform.isFxApplicationThread()) {
                    System.out.println("Emitting val " + v + " on FX Thread");
                }
                else { 
                    System.out.println("Emitting val " + v + " on Non-FX Thread");
                }
                property.set(v);
            }
        });

        return property.getReadOnlyProperty();
    }

最终更新 Tomas Mikula 在 ReactFX GitHub 项目上对这种行为以及解决方案给出了一些非常有用的见解。

https://github.com/TomasMikula/ReactFX/issues/22