如何使用执行器服务处理实时数据?

how to process real time data with executor service?

我的目标是使用 ScheduledExecutorService 收集实时数据,每 5 次扫描一次数据库 秒,捕获任何新数据点,然后继续进行一些计算。数据在不确定的时间到达数据库(即 2 秒内 3 个数据点或 2 秒内 5 个数据点)

每进来一个数据点,计算时间需要10秒。我的问题是:如何在不干扰计算端的情况下同时更新捕获的数据堆栈。 我也认为:

有什么实现可以参考吗? 我 运行 搞不懂,我没有样本,而是伪格式。我尝试过可怕地使用 while-loop,然后通过 pid 进程将其杀死,摘录如下:

        //how to process real time data
        while(true) {
            if(calculationBegins) { //when first data point arrives
                while(true) {
                    int oldDataSize = RealTimeData.size();
                    for(int i = 0; i < oldDataSize; i++) {
                        //
                        //complex calculation that takes 10 sec
                        //
                    }
                    
                    //if new data arrived, break out and proceed calculation
                    while(true){
                        if(RealTimeData.size() > oldDataSize) break; //ScheduledExecutor will update the new size
                    }
                }
            }
        }

完整代码(两个文件)

  1. Test.java:
package test;

import static test.HikariCPDataSource.calculationBegins;
import static test.HikariCPDataSource.RealTimeData;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


public class Test {

    static List<String> capture = new ArrayList<String>();

    public static void main(String[] args) {
        
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);

        Runnable extract = () -> {
            HikariCPDataSource.getData(args[0]);
        };

        service.scheduleAtFixedRate(extract, 0, 5, TimeUnit.SECONDS);
        
        
        //how to process real time data
        while(true) {
            if(calculationBegins) {
                while(true) {
                    int oldDataSize = RealTimeData.size();
                    for(int i = 0; i < oldDataSize; i++) {
                        //
                        //complex calculation that takes 10 sec
                        //
                    }
                    
                    //if new data arrived, break out and proceed calculation
                    while(true){
                        if(RealTimeData.size() > oldDataSize) break;
                    }
                }
            }
        }
        
    }
}
  1. HikariCPDataSource.java
package test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;


public class HikariCPDataSource {
    
    private static HikariConfig config = new HikariConfig();
    private static HikariDataSource ds;
    public static boolean calculationBegins = false; //true: start calculation when the very first data points arrives
    public static List<String> RealTimeData = new ArrayList<>(); //capture real time data from database
    public static int index = 0;
    static {
        config.setJdbcUrl("jdbc:mysql://127.0.0.1/testdb");
        config.setUsername("root");
        config.setPassword("caspo123");
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        config.setMaximumPoolSize(2);
        config.setMinimumIdle(1);
        config.setIdleTimeout(0);
        config.setMaxLifetime(0);
        ds = new HikariDataSource(config);
    }

    public static Connection getConnection() throws SQLException {
        return ds.getConnection();
    }

    private HikariCPDataSource(){}

    public static void setCalculationBegins() {
            calculationBegins = true;
    }
    
    public static int getSizeHikariScout(){
        return(RealTimeData.size());
    }
    
    public static int getIndex() {
        return(index);
    }
    
    public static void resetIndex(int updateIndex) {
        index = index + updateIndex;
    }
    
    public static void getData(String input1) {
        Connection con = null;
        PreparedStatement pstmt = null;
        ResultSet resultSet = null;
        String output = null;
        
        try{
            con = ds.getConnection();
            pstmt = con.prepareStatement("SELECT column1, column2 FROM testdb where id = ? and index > ?");
            pstmt.setString(1, input1);
            pstmt.setInt(1, getIndex());
            resultSet = pstmt.executeQuery();
            int indexCount = 0;
            while (resultSet.next()){
                    if(!calculationBegins) setCalculationBegins(); 
                    output = String.join(",", resultSet.getString("column1"), resultSet.getString("column2"));
                    RealTimeData.add(output);
                    indexCount = indexCount + 1;
            }
            resetIndex(indexCount);
        } catch (SQLException e){
            e.printStackTrace();
        } finally {
            if (con != null) {
                try {
                    con.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if(pstmt != null) {
                try {
                    pstmt.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}

我在这里看到的一个解决方案(实际上我自己也做过类似的事情)是创建两个 ExecutorService。一个是 ScheduledExecutorService,第二个是常规 FixedThreadPool 执行者。

第一个会定期 运行,假设每 3 秒一次。它只会扫描数据库以寻找新的数据点,如果找到一些,它将 submit/queue 任务交给另一个执行器。

第二个执行器将执行所有计算以及您要对数据执行的任何其他操作。提交的任务将排队等待执行,一旦有一些空闲线程就会完成。

这样所有的线程管理和任务队列处理都由Executors完成,你不用自己操心。