如何使用执行器服务处理实时数据?
how to process real time data with executor service?
我的目标是使用 ScheduledExecutorService
收集实时数据,每 5 次扫描一次数据库
秒,捕获任何新数据点,然后继续进行一些计算。数据在不确定的时间到达数据库(即 2 秒内 3 个数据点或 2 秒内 5 个数据点)
每进来一个数据点,计算时间需要10秒。我的问题是:如何在不干扰计算端的情况下同时更新捕获的数据堆栈。
我也认为:
- 因为我不能使用 for 循环,因为实时数据无法按预期到达并且数据不是 'offline' 格式
- 可以计算完成,但没有新数据到达,如何处理这种情况?
- 我不打算在我的计算部分使用第二个
ScheduledExecutorService
,因为计算比扫描部分花费的时间更长。
有什么实现可以参考吗?
我 运行 搞不懂,我没有样本,而是伪格式。我尝试过可怕地使用 while-loop,然后通过 pid 进程将其杀死,摘录如下:
//how to process real time data
while(true) {
if(calculationBegins) { //when first data point arrives
while(true) {
int oldDataSize = RealTimeData.size();
for(int i = 0; i < oldDataSize; i++) {
//
//complex calculation that takes 10 sec
//
}
//if new data arrived, break out and proceed calculation
while(true){
if(RealTimeData.size() > oldDataSize) break; //ScheduledExecutor will update the new size
}
}
}
}
完整代码(两个文件)
- Test.java:
package test;
import static test.HikariCPDataSource.calculationBegins;
import static test.HikariCPDataSource.RealTimeData;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Test {
static List<String> capture = new ArrayList<String>();
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
Runnable extract = () -> {
HikariCPDataSource.getData(args[0]);
};
service.scheduleAtFixedRate(extract, 0, 5, TimeUnit.SECONDS);
//how to process real time data
while(true) {
if(calculationBegins) {
while(true) {
int oldDataSize = RealTimeData.size();
for(int i = 0; i < oldDataSize; i++) {
//
//complex calculation that takes 10 sec
//
}
//if new data arrived, break out and proceed calculation
while(true){
if(RealTimeData.size() > oldDataSize) break;
}
}
}
}
}
}
- HikariCPDataSource.java
package test;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
public class HikariCPDataSource {
private static HikariConfig config = new HikariConfig();
private static HikariDataSource ds;
public static boolean calculationBegins = false; //true: start calculation when the very first data points arrives
public static List<String> RealTimeData = new ArrayList<>(); //capture real time data from database
public static int index = 0;
static {
config.setJdbcUrl("jdbc:mysql://127.0.0.1/testdb");
config.setUsername("root");
config.setPassword("caspo123");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.setMaximumPoolSize(2);
config.setMinimumIdle(1);
config.setIdleTimeout(0);
config.setMaxLifetime(0);
ds = new HikariDataSource(config);
}
public static Connection getConnection() throws SQLException {
return ds.getConnection();
}
private HikariCPDataSource(){}
public static void setCalculationBegins() {
calculationBegins = true;
}
public static int getSizeHikariScout(){
return(RealTimeData.size());
}
public static int getIndex() {
return(index);
}
public static void resetIndex(int updateIndex) {
index = index + updateIndex;
}
public static void getData(String input1) {
Connection con = null;
PreparedStatement pstmt = null;
ResultSet resultSet = null;
String output = null;
try{
con = ds.getConnection();
pstmt = con.prepareStatement("SELECT column1, column2 FROM testdb where id = ? and index > ?");
pstmt.setString(1, input1);
pstmt.setInt(1, getIndex());
resultSet = pstmt.executeQuery();
int indexCount = 0;
while (resultSet.next()){
if(!calculationBegins) setCalculationBegins();
output = String.join(",", resultSet.getString("column1"), resultSet.getString("column2"));
RealTimeData.add(output);
indexCount = indexCount + 1;
}
resetIndex(indexCount);
} catch (SQLException e){
e.printStackTrace();
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
我在这里看到的一个解决方案(实际上我自己也做过类似的事情)是创建两个 ExecutorService
。一个是 ScheduledExecutorService
,第二个是常规 FixedThreadPool
执行者。
第一个会定期 运行,假设每 3 秒一次。它只会扫描数据库以寻找新的数据点,如果找到一些,它将 submit/queue 任务交给另一个执行器。
第二个执行器将执行所有计算以及您要对数据执行的任何其他操作。提交的任务将排队等待执行,一旦有一些空闲线程就会完成。
这样所有的线程管理和任务队列处理都由Executors完成,你不用自己操心。
我的目标是使用 ScheduledExecutorService
收集实时数据,每 5 次扫描一次数据库
秒,捕获任何新数据点,然后继续进行一些计算。数据在不确定的时间到达数据库(即 2 秒内 3 个数据点或 2 秒内 5 个数据点)
每进来一个数据点,计算时间需要10秒。我的问题是:如何在不干扰计算端的情况下同时更新捕获的数据堆栈。 我也认为:
- 因为我不能使用 for 循环,因为实时数据无法按预期到达并且数据不是 'offline' 格式
- 可以计算完成,但没有新数据到达,如何处理这种情况?
- 我不打算在我的计算部分使用第二个
ScheduledExecutorService
,因为计算比扫描部分花费的时间更长。
有什么实现可以参考吗? 我 运行 搞不懂,我没有样本,而是伪格式。我尝试过可怕地使用 while-loop,然后通过 pid 进程将其杀死,摘录如下:
//how to process real time data
while(true) {
if(calculationBegins) { //when first data point arrives
while(true) {
int oldDataSize = RealTimeData.size();
for(int i = 0; i < oldDataSize; i++) {
//
//complex calculation that takes 10 sec
//
}
//if new data arrived, break out and proceed calculation
while(true){
if(RealTimeData.size() > oldDataSize) break; //ScheduledExecutor will update the new size
}
}
}
}
完整代码(两个文件)
- Test.java:
package test;
import static test.HikariCPDataSource.calculationBegins;
import static test.HikariCPDataSource.RealTimeData;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Test {
static List<String> capture = new ArrayList<String>();
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
Runnable extract = () -> {
HikariCPDataSource.getData(args[0]);
};
service.scheduleAtFixedRate(extract, 0, 5, TimeUnit.SECONDS);
//how to process real time data
while(true) {
if(calculationBegins) {
while(true) {
int oldDataSize = RealTimeData.size();
for(int i = 0; i < oldDataSize; i++) {
//
//complex calculation that takes 10 sec
//
}
//if new data arrived, break out and proceed calculation
while(true){
if(RealTimeData.size() > oldDataSize) break;
}
}
}
}
}
}
- HikariCPDataSource.java
package test;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
public class HikariCPDataSource {
private static HikariConfig config = new HikariConfig();
private static HikariDataSource ds;
public static boolean calculationBegins = false; //true: start calculation when the very first data points arrives
public static List<String> RealTimeData = new ArrayList<>(); //capture real time data from database
public static int index = 0;
static {
config.setJdbcUrl("jdbc:mysql://127.0.0.1/testdb");
config.setUsername("root");
config.setPassword("caspo123");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.setMaximumPoolSize(2);
config.setMinimumIdle(1);
config.setIdleTimeout(0);
config.setMaxLifetime(0);
ds = new HikariDataSource(config);
}
public static Connection getConnection() throws SQLException {
return ds.getConnection();
}
private HikariCPDataSource(){}
public static void setCalculationBegins() {
calculationBegins = true;
}
public static int getSizeHikariScout(){
return(RealTimeData.size());
}
public static int getIndex() {
return(index);
}
public static void resetIndex(int updateIndex) {
index = index + updateIndex;
}
public static void getData(String input1) {
Connection con = null;
PreparedStatement pstmt = null;
ResultSet resultSet = null;
String output = null;
try{
con = ds.getConnection();
pstmt = con.prepareStatement("SELECT column1, column2 FROM testdb where id = ? and index > ?");
pstmt.setString(1, input1);
pstmt.setInt(1, getIndex());
resultSet = pstmt.executeQuery();
int indexCount = 0;
while (resultSet.next()){
if(!calculationBegins) setCalculationBegins();
output = String.join(",", resultSet.getString("column1"), resultSet.getString("column2"));
RealTimeData.add(output);
indexCount = indexCount + 1;
}
resetIndex(indexCount);
} catch (SQLException e){
e.printStackTrace();
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
我在这里看到的一个解决方案(实际上我自己也做过类似的事情)是创建两个 ExecutorService
。一个是 ScheduledExecutorService
,第二个是常规 FixedThreadPool
执行者。
第一个会定期 运行,假设每 3 秒一次。它只会扫描数据库以寻找新的数据点,如果找到一些,它将 submit/queue 任务交给另一个执行器。
第二个执行器将执行所有计算以及您要对数据执行的任何其他操作。提交的任务将排队等待执行,一旦有一些空闲线程就会完成。
这样所有的线程管理和任务队列处理都由Executors完成,你不用自己操心。