使用 ResultSetFuture 修改集合
Modify collection with ResultSetFuture
我尝试使用
java-driver-async-queries。我正在修改 FutureCallback 中的列表,但它似乎不起作用 -
List<Product> products = new ArrayList<Product>();
for (// iterating over a Map) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
ResultSetFuture future = session.executeAsync(query, key);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override public void onSuccess(ResultSet result) {
Row row = result.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
}
@Override public void onFailure(Throwable t) {
// log error
}
},
MoreExecutors.sameThreadExecutor()
);
}
System.out.println("Product List : " + products); // Not printing correct values. Sometimes print blank
还有其他方法吗?
基于我实施的 Mikhail Baksheev 回答,现在得到了正确的结果。
只是一个转折。我需要实现一些额外的逻辑。我想知道我是否可以使用 List<MyClass>
而不是 List<ResultSetFuture>
和 MyClass 作为 -
public class MyClass {
private Integer productCount;
private Integer stockCount;
private ResultSetFuture result;
}
然后在迭代时将 FutureList
设置为 -
ResultSetFuture result = session.executeAsync(query, key.get());
MyClass allResult = new MyClass();
allResult.setInCount(inCount);
allResult.setResult(result);
allResult.setSohCount(values.size() - inCount);
futuresList.add(allResult);
如@RussS 所述,代码不会等待所有期货完成。
同步异步代码的方式有很多种。例如,使用 CountDownLatch:
编辑:
另外请使用单独的线程进行回调,并使用产品的并发收集。
ConcurrentLinkedQueue<Product> products = new ConcurrentLinkedQueue<Product>();
final Executor callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch doneSignal = new CountDownLatch(/*the Map size*/);
for (// iterating over a Map) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
ResultSetFuture future = session.executeAsync(query, key);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override public void onSuccess(ResultSet result) {
Row row = result.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
doneSignal.countDown();
}
@Override public void onFailure(Throwable t) {
// log error
doneSignal.countDown();
}
},
callbackExecutor
);
}
doneSignal.await(); // wait for all async requests to finish
System.out.println("Product List : " + products);
另一种方法是将所有期货收集在一个列表中,并等待所有结果作为番石榴 Futures.allAsList 的单一期货,例如:
List<ResultSetFuture> futuresList = new ArrayList<>( /*Map size*/);
for (/* iterating over a Map*/) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
futuresList.add( session.executeAsync( query, key ) );
}
ListenableFuture<List<ResultSet>> allFuturesResult = Futures.allAsList( futuresList );
List<Product> products = new ArrayList<>();
try {
final List<ResultSet> resultSets = allFuturesResult.get();
for ( ResultSet rs : resultSets ) {
if ( null != rs ) {
Row row = rs.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
}
}
} catch ( InterruptedException | ExecutionException e ) {
System.out.println(e);
}
System.out.println("Product List : " + products);
编辑 2
I am wondering if I can use List instead of List and MyClass as
技术上是的,但在这种情况下你不能在 Futures.allAsList
中传递 List<MyClass>
或者 MyClass
应该实现 ListenableFuture
接口
我尝试使用 java-driver-async-queries。我正在修改 FutureCallback 中的列表,但它似乎不起作用 -
List<Product> products = new ArrayList<Product>();
for (// iterating over a Map) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
ResultSetFuture future = session.executeAsync(query, key);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override public void onSuccess(ResultSet result) {
Row row = result.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
}
@Override public void onFailure(Throwable t) {
// log error
}
},
MoreExecutors.sameThreadExecutor()
);
}
System.out.println("Product List : " + products); // Not printing correct values. Sometimes print blank
还有其他方法吗?
基于我实施的 Mikhail Baksheev 回答,现在得到了正确的结果。
只是一个转折。我需要实现一些额外的逻辑。我想知道我是否可以使用 List<MyClass>
而不是 List<ResultSetFuture>
和 MyClass 作为 -
public class MyClass {
private Integer productCount;
private Integer stockCount;
private ResultSetFuture result;
}
然后在迭代时将 FutureList
设置为 -
ResultSetFuture result = session.executeAsync(query, key.get());
MyClass allResult = new MyClass();
allResult.setInCount(inCount);
allResult.setResult(result);
allResult.setSohCount(values.size() - inCount);
futuresList.add(allResult);
如@RussS 所述,代码不会等待所有期货完成。
同步异步代码的方式有很多种。例如,使用 CountDownLatch:
编辑: 另外请使用单独的线程进行回调,并使用产品的并发收集。
ConcurrentLinkedQueue<Product> products = new ConcurrentLinkedQueue<Product>();
final Executor callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch doneSignal = new CountDownLatch(/*the Map size*/);
for (// iterating over a Map) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
ResultSetFuture future = session.executeAsync(query, key);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override public void onSuccess(ResultSet result) {
Row row = result.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
doneSignal.countDown();
}
@Override public void onFailure(Throwable t) {
// log error
doneSignal.countDown();
}
},
callbackExecutor
);
}
doneSignal.await(); // wait for all async requests to finish
System.out.println("Product List : " + products);
另一种方法是将所有期货收集在一个列表中,并等待所有结果作为番石榴 Futures.allAsList 的单一期货,例如:
List<ResultSetFuture> futuresList = new ArrayList<>( /*Map size*/);
for (/* iterating over a Map*/) {
key = entry.getKey();
String query = "SELECT id,desc,category FROM products where id=?";
futuresList.add( session.executeAsync( query, key ) );
}
ListenableFuture<List<ResultSet>> allFuturesResult = Futures.allAsList( futuresList );
List<Product> products = new ArrayList<>();
try {
final List<ResultSet> resultSets = allFuturesResult.get();
for ( ResultSet rs : resultSets ) {
if ( null != rs ) {
Row row = rs.one();
if (row != null) {
Product product = new Product();
product.setId(row.getString("id"));
product.setDesc(row.getString("desc"));
product.setCategory(row.getString("category"));
products.add(product);
}
}
}
} catch ( InterruptedException | ExecutionException e ) {
System.out.println(e);
}
System.out.println("Product List : " + products);
编辑 2
I am wondering if I can use List instead of List and MyClass as
技术上是的,但在这种情况下你不能在 Futures.allAsList
中传递 List<MyClass>
或者 MyClass
应该实现 ListenableFuture
接口