如何并行而不是顺序执行多个查询?
How to execute multiple queries in parallel instead of sequentially?
我正在查询所有 10 个 table 以从中获取用户 ID,并将所有用户 ID 加载到 HashSet 中,这样我就可以拥有唯一的用户 ID。
目前是顺序的。我们转到一个 table 并从中提取所有 user_id 并将其加载到哈希集中,然后是第二个和第三个 table 并继续。
private Set<String> getRandomUsers() {
Set<String> userList = new HashSet<String>();
// is there any way to make this parallel?
for (int table = 0; table < 10; table++) {
String sql = "select * from testkeyspace.test_table_" + table + ";";
try {
SimpleStatement query = new SimpleStatement(sql);
query.setConsistencyLevel(ConsistencyLevel.QUORUM);
ResultSet res = session.execute(query);
Iterator<Row> rows = res.iterator();
while (rows.hasNext()) {
Row r = rows.next();
String user_id = r.getString("user_id");
userList.add(user_id);
}
} catch (Exception e) {
System.out.println("error= " + ExceptionUtils.getStackTrace(e));
}
}
return userList;
}
有没有什么方法可以使这个多线程,以便每个 table 他们从我的 table 并行获取数据?最后,我需要 userList
哈希集,它应该具有来自所有 10 table 的所有唯一用户 ID。
我正在使用 Cassandra 数据库,并且只建立一次连接,因此我不需要创建多个连接。
您也许可以使其成为多线程,但由于线程创建和多个连接的开销,您可能不会有太大的好处。相反,在 mysql 中使用 UNION 语句并一次获取它们。让数据库引擎弄清楚如何有效地获取它们:
String sql = "select user_id from testkeyspace.test_table_1 UNION select user_id from testkeyspace.test_table_2 UNION select user_id from testkeyspace.test_table_3 ...."
当然,您必须以编程方式创建 sql 查询字符串。实际上不要在查询中输入“....”。
如果您能够使用 Java 8,您可能可以针对 table 列表使用 parallelStream
来执行此操作,并使用 lambda 来扩展 table 名称放入每个 table 的相应唯一 ID 列表中,然后将结果连接到一个散列中。
如果没有 Java 8,我会使用 Google Guava 的可听期货和类似这样的执行程序服务:
public static Set<String> fetchFromTable(int table) {
String sql = "select * from testkeyspace.test_table_" + table + ";";
Set<String> result = new HashSet<String>();
// populate result with your SQL statements
// ...
return result;
}
public static Set<String> fetchFromAllTables() throws InterruptedException, ExecutionException {
// Create a ListeningExecutorService (Guava) by wrapping a
// normal ExecutorService (Java)
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
List<ListenableFuture<Set<String>>> list =
new ArrayList<ListenableFuture<Set<String>>>();
// For each table, create an independent thread that will
// query just that table and return a set of user IDs from it
for (int i = 0; i < 10; i++) {
final int table = i;
ListenableFuture<Set<String>> future = executor.submit(new Callable<Set<String>>() {
public Set<String> call() throws Exception {
return fetchFromTable(table);
}
});
// Add the future to the list
list.add(future);
}
// We want to know when ALL the threads have completed,
// so we use a Guava function to turn a list of ListenableFutures
// into a single ListenableFuture
ListenableFuture<List<Set<String>>> combinedFutures = Futures.allAsList(list);
// The get on the combined ListenableFuture will now block until
// ALL the individual threads have completed work.
List<Set<String>> tableSets = combinedFutures.get();
// Now all we have to do is combine the individual sets into a
// single result
Set<String> userList = new HashSet<String>();
for (Set<String> tableSet: tableSets) {
userList.addAll(tableSet);
}
return userList;
}
Executors和Futures的使用都是核心Java。 Guava 唯一做的就是让我把 Futures 变成 ListenableFutures。有关后者为何更好的讨论,请参阅 here。
可能仍有方法可以提高这种方法的并行性,但如果您的大部分时间花在等待数据库响应或处理网络流量上,那么这种方法可能会有所帮助。
我正在查询所有 10 个 table 以从中获取用户 ID,并将所有用户 ID 加载到 HashSet 中,这样我就可以拥有唯一的用户 ID。
目前是顺序的。我们转到一个 table 并从中提取所有 user_id 并将其加载到哈希集中,然后是第二个和第三个 table 并继续。
private Set<String> getRandomUsers() {
Set<String> userList = new HashSet<String>();
// is there any way to make this parallel?
for (int table = 0; table < 10; table++) {
String sql = "select * from testkeyspace.test_table_" + table + ";";
try {
SimpleStatement query = new SimpleStatement(sql);
query.setConsistencyLevel(ConsistencyLevel.QUORUM);
ResultSet res = session.execute(query);
Iterator<Row> rows = res.iterator();
while (rows.hasNext()) {
Row r = rows.next();
String user_id = r.getString("user_id");
userList.add(user_id);
}
} catch (Exception e) {
System.out.println("error= " + ExceptionUtils.getStackTrace(e));
}
}
return userList;
}
有没有什么方法可以使这个多线程,以便每个 table 他们从我的 table 并行获取数据?最后,我需要 userList
哈希集,它应该具有来自所有 10 table 的所有唯一用户 ID。
我正在使用 Cassandra 数据库,并且只建立一次连接,因此我不需要创建多个连接。
您也许可以使其成为多线程,但由于线程创建和多个连接的开销,您可能不会有太大的好处。相反,在 mysql 中使用 UNION 语句并一次获取它们。让数据库引擎弄清楚如何有效地获取它们:
String sql = "select user_id from testkeyspace.test_table_1 UNION select user_id from testkeyspace.test_table_2 UNION select user_id from testkeyspace.test_table_3 ...."
当然,您必须以编程方式创建 sql 查询字符串。实际上不要在查询中输入“....”。
如果您能够使用 Java 8,您可能可以针对 table 列表使用 parallelStream
来执行此操作,并使用 lambda 来扩展 table 名称放入每个 table 的相应唯一 ID 列表中,然后将结果连接到一个散列中。
如果没有 Java 8,我会使用 Google Guava 的可听期货和类似这样的执行程序服务:
public static Set<String> fetchFromTable(int table) {
String sql = "select * from testkeyspace.test_table_" + table + ";";
Set<String> result = new HashSet<String>();
// populate result with your SQL statements
// ...
return result;
}
public static Set<String> fetchFromAllTables() throws InterruptedException, ExecutionException {
// Create a ListeningExecutorService (Guava) by wrapping a
// normal ExecutorService (Java)
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
List<ListenableFuture<Set<String>>> list =
new ArrayList<ListenableFuture<Set<String>>>();
// For each table, create an independent thread that will
// query just that table and return a set of user IDs from it
for (int i = 0; i < 10; i++) {
final int table = i;
ListenableFuture<Set<String>> future = executor.submit(new Callable<Set<String>>() {
public Set<String> call() throws Exception {
return fetchFromTable(table);
}
});
// Add the future to the list
list.add(future);
}
// We want to know when ALL the threads have completed,
// so we use a Guava function to turn a list of ListenableFutures
// into a single ListenableFuture
ListenableFuture<List<Set<String>>> combinedFutures = Futures.allAsList(list);
// The get on the combined ListenableFuture will now block until
// ALL the individual threads have completed work.
List<Set<String>> tableSets = combinedFutures.get();
// Now all we have to do is combine the individual sets into a
// single result
Set<String> userList = new HashSet<String>();
for (Set<String> tableSet: tableSets) {
userList.addAll(tableSet);
}
return userList;
}
Executors和Futures的使用都是核心Java。 Guava 唯一做的就是让我把 Futures 变成 ListenableFutures。有关后者为何更好的讨论,请参阅 here。
可能仍有方法可以提高这种方法的并行性,但如果您的大部分时间花在等待数据库响应或处理网络流量上,那么这种方法可能会有所帮助。