在 GUI 中与 JavaFX 网络服务交互

interacting with JavaFX network service in a GUI

我正在从编写 java Swing 应用程序过渡到编写基于 JavaFX 的现代 java GUI 应用程序。

我想知道创建基于网络的可重用线程服务的最佳方法。我编写网络服务的方法是使用控制器 class(通过 Net-beans GUI 从 FXML 生成)。我通过名为“transmitter”的私有服务成员将线程逻辑放在此处,并通过 Start/Stop 按钮的事件回调连接了 start/stop 逻辑。
基于网络的线程实现为 javafx 服务 - 我这样做是因为我想在目标地址更改时重新启动 service/thread。这似乎是代替独立任务的推荐方法。

现在的网络服务非常简单,它所做的就是使用一些 GUI 小部件来配置一个数据包每秒传输一次 host/port。仅当 host/port 小部件更改时我才需要重新启动服务,但是如果网络服务是 运行,我想修改没有 interrupting/restarting DatagramSocket 的数据包。我有疑问并需要一些指导的地方是:

  1. 将网络线程线程化的推荐方法是什么 基于 FXML 的应用程序?一个例子将不胜感激。
  2. 我如何安全地传达 GUI 小部件的更改(通过它们的 操作执行回调)到 运行 服务 class?

下面显示的是我的控制器最相关的部分class:

/**
 * FXML Controller class
 *
 * @author johnc
 */
public class OpMessageServerController implements Initializable {
    @FXML
    private Text mCurrentDateTimeText;
    @FXML
    private Label mApplicationStatus;
    @FXML
    private ComboBox<DiscreteStatus> mPofDS;
    @FXML
    private ComboBox<PhaseOfFlightFMS> mPofFMS;
    @FXML
    private ComboBox<DiscreteStatus> mTailNumberDS;
    @FXML
    private ComboBox<DiscreteStatus> mConfigTableDS;
    @FXML
    private ComboBox<DiscreteStatus> mDateTimeDS;
    @FXML
    private TextField mEpicPN;
    @FXML
    private TextField mConfigTablePNHash;
    @FXML
    private TextField mTailNumber;
    @FXML
    private ComboBox<DiscreteStatus> mTopLevelPNDS;
    @FXML
    private Button mStartStopButton;
    @FXML
    private ComboBox<String> mDLMUHostSpec;
    @FXML
    private CheckBox connectionStatusC1;
    @FXML
    private CheckBox wsuConnectionStatus;
    @FXML
    private CheckBox connectionStatusC4;
    @FXML
    private CheckBox connectionStatusC3;
    @FXML
    private CheckBox connectionStatusC2;
    @FXML
    private CheckBox dlmuwConnectionStatus;

    private Service<Void> transmitter;



    /**
     * Initializes the controller class.
     * @param url
     * @param rb
     */
    @Override
    public void initialize(URL url, ResourceBundle rb) {        
        mPofDS.setItems(FXCollections.observableArrayList(DiscreteStatus.values()));
        mPofDS.getSelectionModel().selectFirst();
        mPofFMS.setItems(FXCollections.observableArrayList(PhaseOfFlightFMS.values()));
        mPofFMS.getSelectionModel().selectFirst();
        mTailNumberDS.setItems(FXCollections.observableArrayList(DiscreteStatus.values()));
        mTailNumberDS.getSelectionModel().selectFirst();
        mConfigTableDS.setItems(FXCollections.observableArrayList(DiscreteStatus.values()));
        mConfigTableDS.getSelectionModel().selectFirst();
        mDateTimeDS.setItems(FXCollections.observableArrayList(DiscreteStatus.values()));
        mDateTimeDS.getSelectionModel().selectFirst();
        mTopLevelPNDS.setItems(FXCollections.observableArrayList(DiscreteStatus.values()));
        mTopLevelPNDS.getSelectionModel().selectFirst();
//      mDLMUHostSpec.setItems(FXCollections.observableArrayList(
//          FXCollections.observableArrayList("localhost:1234", "192.168.200.2:1234")));

        // add event handler here to update the current date/time label
        // this should also update the transmit datastructure
        final Timeline timeline = new Timeline(new KeyFrame(
            Duration.seconds(1), (ActionEvent event) -> {
            LocalDateTime currentDateTime = LocalDateTime.now();
            mCurrentDateTimeText.setText(currentDateTime.format(
                DateTimeFormatter.ofPattern("kk:mm:ss uuuu")));  
        }));  

        timeline.setCycleCount(Animation.INDEFINITE);  
        timeline.play();

        // create a service.
        transmitter = new Service() {
            @Override 
            protected Task createTask() {
                return new Task<Void>() {
                    @Override 
                    protected Void call() throws InterruptedException {
                        updateMessage("Running...");
                        updateProgress(0, 10);
                        DatagramSocket sock = null;
                        while (!isCancelled()) {
                            try {
                                if (sock == null) {
                                    DatagramSocket sock = new DatagramSocket();
                                }
                            } catch (SocketException ex) {
                                Logger.getLogger(OpMessageServerController.class.getName()).log(Level.SEVERE, null, ex);
                            }
                            //Block the thread for a short time, but be sure
                            //to check the InterruptedException for cancellation
                            OpSupportMessage opSupportMessage = new OpSupportMessage(
                                DiscreteStatus.NormalOperation, 
                                PhaseOfFlightFMS.Cruise, 
                                DiscreteStatus.NormalOperation, 
                                "TAILNUM",
                                DiscreteStatus.NormalOperation);
                            ByteArrayOutputStream bos = new ByteArrayOutputStream();
                            String[] specParts = mDLMUHostSpec.getValue().split(":");
                            if (specParts.length == 2) {
                                try {
                                    opSupportMessage.write(bos);
                                    byte[] buff = bos.toByteArray();
                                    DatagramPacket packet = new DatagramPacket(
                                        buff, buff.length, InetAddress.getByName(
                                        specParts[0]), Integer.parseInt(specParts[1]));
                                    mSocket.send(packet);
                                    Thread.sleep(1000);
                                } catch (IOException ex) {
                                } catch (InterruptedException interrupted) {
                                    if (isCancelled()) {
                                        updateMessage("Cancelled");
                                        break;
                                    }
                                }
                            }
                        }                        
                        updateMessage("Cancelled");
                        return null;
                    }

                    @Override
                    protected void succeeded() {
                        System.out.println("Scanning completed.");
                    }

                    @Override 
                    protected void failed() {
                        System.out.println("Scanning failed.");
                    }

                    @Override 
                    protected void running() {
                        System.out.println("Scanning started.");
                    }

                    @Override 
                    protected void cancelled() {
                         System.out.println("Scanning cancelled.");
                    }                    

                    private void DatagramSocket() {
                        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
                    }
                };
            }
        };

        mApplicationStatus.textProperty().bind(transmitter.messageProperty());

    };

@FXML
private void startStopButtonAction(ActionEvent event) {
    if (!transmitter.isRunning()) {
        transmitter.reset();
        transmitter.start();
    }                
}

 …

}

I would like to know the best approach to create a network based reusable threading service. The way I coded up the network service was to use a controller class (generated from the FXML via the Net-beans GUI). I put the threading logic here via a private Service member named 'transmitter' and I wired up the start/stop logic via the Start/Stop button's event callback.

我诚挚地建议您将网络服务和 GUI 控制器开发为单独的项目。

我会将网络服务 运行 作为 daemon/background 线程放在自己的容器或虚拟机中。这种组织的优点是它使您的服务器远离 JavaFX 事件循环和应用程序线程的变幻莫测。您需要将服务设计为识别来自控制器的管理命令 and/or 中断请求。您可以将网络服务开发为 REST 或任何您想要的方式,而无需考虑如何将其转入 JavaFX 应用程序线程。

然后我会将 GUI 控制器 运行 作为单独的 GUI 应用程序放在同一进程中,或者如果需要远程管理,则放在单独的 JVM 中(并使用 IPC 来 send/receive 管理消息)。

TL;DR:如果是我,我会抵制将网络服务编程为 JavaFX 应用程序的诱惑。

背景

这个答案是基于对问题的评论的集合,它有点啰嗦,没有提供针对问题中代码的解决方案,也没有解决问题中的一些概念,例如 low基于 UDP 套接字级别的通信系统 - 对此深表歉意。

示例解决方案项目

我使用基于网络套接字的通信对 JavaFX 应用程序进行了概念验证:javafx-websocket-test. Perhaps some of the concepts from there might help you, in particular the client JavaFX Task and Service code and the sample client application and controller 使用它。

该项目确实在可执行实现中展示了 Adam Bien's article on JavaFX Integration Strategies 中概述的一些通信原则 James_D 链接,例如:

  1. 在 JavaFX 服务中设置网络套接字端点。
  2. 将每个通信交互包装在异步 JavaFX 任务中。
  3. 使用异步事件回调将成功和失败结果分流回 UI。

此外,该示例还显示了网络服务与 JavaFX UI 之间的交互,其中 JavaFX UI 向服务发出异步请求并处理异步来自它的回应。

我确实记得看似简单的 Java 网络套接字 API 确实包含一些问题。它只是一个概念证明,因此请谨慎使用它作为强大网络服务的基础。

评论与感想

由于以下原因,这实际上是一个很难回答 IMO 的问题:

  1. 网络通信有多种形式,其中一些适合不同的应用。
  2. (当前)没有将网络服务与 JavaFX 应用程序集成的标准或最佳实践。
  3. 通过 UI 状态监控和异常处理提供强大的网络连接通常并不像看起来的那样 straight-forward,而且很容易出错。

还有很多细微之处需要处理,比如:

  • 通讯失败怎么办?
  • 如果应用程序以比网络或服务器处理速度更快的速度发出请求怎么办?
  • 如果用户在消息未完成时关闭应用程序会怎样?
  • 如何确保UI在出现冗长的通信过程时不被冻结?
  • 如何提供 UI 网络处理冗长 on-going 的反馈?
  • 正在使用什么基础通信技术?
  • 底层通信是有状态的还是无状态的?
  • 通信non-blocking和事件驱动还是阻塞?
  • 如何序列化和反序列化传输数据?

尽管 one-size 适合所有通信模型很困难,但可以调整 "standard" 通信模型以满足许多需求。例如类似于 http ajax calls in the browser based network model or NetConnections for flash 的内容。这些似乎足以满足各种需求。当然,它们并非适用于所有情况,否则就不会创建替代系统,例如网络套接字或 http 直播。

理想情况下,JavaFX 客户端 => 服务器通信应该有一个单一的标准化 API,如 jQuery.ajax(),但我还没有看到有人创建 JavaFX等价于那种API.

与核心 JavaFX APIs 的其余部分不同,这种用于网络通信的标准化 high-level 接口目前不以 off-the-shelf 形式存在.但是,有大量库和函数可用作开发您自己的服务的基本构建块;甚至可能太多而无法合理处理。

请注意,大多数更高级别的网络协议库,例如 Tyrus web socket implementation or the Apache HTTP components underlying a JAX-RS provider have their own internal thread-pools for communication. Systems like netty are based upon nio,都是事件驱动的,而不是线程管理的。您的 JavaFX 网络客户端服务是以下两个之一:

  • 对于 non-blocking I/O it is issuing async calls, hooking into the response events and relaying them back to JavaFX via Platform.runLater
  • 为了阻塞 I/O,它生成一个带有任务或服务的线程,带有隐式或显式执行程序服务池来管理 UI 交互而不是实际的网络通信。

一个关键且令人困惑的事情是 JavaFX 应用程序代码应始终以异步方式执行网络通信。对于 non-blocking I/O 调用已经是异步的,因此不需要包装器任务。对于阻塞 I/O,您不想阻塞 UI 线程,因此它自己的线程中的任务包装器 运行 可以防止这种情况发生。

有人会认为这会使 non-blocking I/O 调用更简单,但事实并非如此,因为 JDK 的 non-blocking I/O API 级别很低,编码起来非常棘手。它不太适合高级应用程序代码。

通常,应用程序代码最好使用更高级别的库,例如 JAX-RS、web 套接字或 akka(或者最好是在它们之上的一层),它们在内部管理通信的细节阻塞或 non-blocking 方式,并提供事件驱动 API 来发送和接收消息。可以将各个消息事件包装在 JavaFX Task 中以进行异步处理。因此,从 JavaFX 应用程序的角度来看,一切都是事件驱动的,没有任何东西是阻塞的,同一个应用程序 API 可以工作,而不管底层通信协议和 blocking/non-blocking 通信基础设施如何。

thanks for the proof of concept application, this will be quite useful, however one thing that is a bit obscure is how one can safely communicate GUI changes to the running service thread safely. It appears that the HelloService uses a 'name' simple string property to communicate changes from the GUI to the service before it is started. I wonder how one might communicate UI changes to a running background service in a thread safe manner. Via some sort or message api perhaps?

一个BlockingQueue with a fixed max-size which rejects additional requests when the queue is full can be used for communication from JavaFX thread based code to a consumer service. It is a reasonably elegant solution to the classic producer-consumer problem

当然,您可以跳过阻塞队列并继续创建异步任务 ad-nauseum,这对于低流量通信来说很好,但可能导致高流量通信的有限线程资源匮乏。一种标准的处理方法是使用 ExecutorService from Executors which manages a thread pool. The thread pool for the executor service can be defined to be bounded to a max number of threads 并在内部使用无界队列,如果所有线程都忙,消息就会堆积起来。这样你就不需要定义自己的阻塞队列,你只需发出异步服务请求,如果可以的话,它们会立即在线程中处理,如果不能的话,请求就会堆积在内部队列中。

这实际上是 JavaFX Service 的工作方式:

The Service by default uses a thread pool Executor with some unspecified default or maximum thread pool size. This is done so that naive code will not completely swamp the system by creating thousands of Threads.

和:

If an Executor is specified on the Service, then it will be used to actually execute the service. Otherwise, a daemon thread will be created and executed. If you wish to create non-daemon threads, then specify a custom Executor (for example, you could use a ThreadPoolExecutor with a custom ThreadFactory).

不适合使用简单的 BlockedQueue 消息传递的更复杂的解决方案将使用基于主题的消息队列样式解决方案,例如,基于 Java 的 STOMP client such as this kaazing example.

获取服务的消息信息只是要求的一部分,本质上是执行异步消息发送。您还需要处理返回的响应。为此,有两种选择:

  1. 您将每个请求建模为单独的任务,onSuccess 和 onError 处理程序处理任务响应。 运行 服务中的任务确保它由具有固定线程池的执行程序处理,该线程池由内部溢出队列支持。
  2. 你用它自己的 API 编写自己的长 运行 服务接口,并为请求封装一个阻塞队列,使用 Platform.runLater 处理返回到 [=180= 的通信结果].

为了使响应处理程序逻辑动态化并由调用者调整,您可以将处理程序函数作为 lambda 函数传递,以便在使用 Platform.runLater 的原始调用成功时执行。

如果将调用包装在任务或服务中,并使用 onSucceeded 函数,则不需要 runLater 调用,因为该实现将确保在 JavaFX 上调用 onSucceeded 处理程序任务完成后线程。

请注意,网络请求和响应通常需要将数据编组和解编组与可序列化流进行某种转换。一些更高级别的网络 API,例如 JAX-RS 或网络套接字提供程序提供接口和实用程序来为您完成某些工作,通常使用特定的库进行不同类型的转换,例如 JAXB for XML serialization of Jackson 用于 JSON 序列化。

稍微相关的信息和进一步的想法

下一个可能有点off-topic,但这是BlockingQueue and Task interaction的示例,它不是网络服务,但它确实演示了producer/consumer中队列的使用情况,具有反应性 UI 和进度监控。

另一件有趣的事情(至少对我而言)是基于 Akka 的 JavaFX 客户端-> 服务器通信解决方案。这似乎是传统 http/rest/soap/rmi 调用或基于消息队列的处理的不错替代方案。 Akka 本质上是 fault-tolerant 异步并发通信的基于事件的解决方案,因此对于 match-up 基于 UI 的框架(例如 JavaFX)来说,它似乎是一个很好的 match-up,允许开发人员在适当的抽象层处理。但是我还没有看到依赖于 Akka 的基于 FX 的消息传递客户端。