Hyperledger Fabric:使用 java-gateway-sdk 的异步/并行事务
Hyperledger Fabric: Async / parallel transaction using java-gateway-sdk
我正在尝试使用 java 网关 sdk 将异步事务发送到我的 Fabric 网络,但我收到错误 Channel [CHANNEL NAME] has been shutdown
。
这里有一些示例代码:
Gateway.Builder builder = Gateway.createBuilder()
.discovery(true)
.identity(wallet, user.getName())
.networkConfig([PATH TO CONNECTION PROFILE]);
try(Gateway gateway = builder.connect()) {
Network channel = gateway.getNetwork(CHANNEL_NAME);
Contract someChaincode = channel.getContract(CHAINCODE_NAME);
int coresNumber = (Runtime.getRuntime().availableProcessors());
ExecutorService executor = Executors.newFixedThreadPool(coresNumber);
for(String elemt : elements) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try{
//Exception thrown here
byte[] res = someChaincode.submitTransaction("someFunction", elemt);
return new String(res);
} catch (ContractException e) {
e.printStackTrace();
}
}, executor);
}
} catch (Exception e) {
// Handle Exception
}
这里是个例外:
java.util.concurrent.ExecutionException: org.hyperledger.fabric.gateway.GatewayRuntimeException: org.hyperledger.fabric.sdk.exception.InvalidArgumentException: Channel [CHANNEL NAME] has been shutdown.
准确的说,异常是在方法checkChannelState()
中抛出的。我感觉我没有正确处理多线程。
您看起来并没有在等待您在代码段中创建的期货的完成。因此,您正在安排事务调用以在不同线程上执行,但随后,在执行此代码之前,退出了 try-with-resources 块,该块关闭了您用于连接的 Gateway 实例。关闭网关会导致所有关联的资源和连接关闭,包括底层通道。因此,当您的事务调用实际获得 运行 时,您已经关闭了它们执行所需的连接和资源。
您需要在关闭Gateway 实例之前从您创建的Future 对象中获取结果;换句话说,在退出创建网关的 try-with-resources 块之前。大概是这样的:
Collection<Callable<String>> tasks = elements.stream()
.map(element -> new Callable<String>() {
public String call() throws ContractException, TimeoutException, InterruptedException {
byte[] result = contract.submitTransaction("someFunction", element);
return new String(result);
}
}).collect(Collectors.toList());
try {
Collection<String> results = new ArrayList<>();
Collection<Future<String>> futures = executor.invokeAll(tasks, timeout, timeUnit);
for (Future<String> future : futures) {
try {
String result = future.get(timeout, timeUnit);
results.add(result);
} catch (CancellationException | InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
}
System.out.println("Results: " + results);
} catch (InterruptedException e ) {
e.printStackTrace();
}
我正在尝试使用 java 网关 sdk 将异步事务发送到我的 Fabric 网络,但我收到错误 Channel [CHANNEL NAME] has been shutdown
。
这里有一些示例代码:
Gateway.Builder builder = Gateway.createBuilder()
.discovery(true)
.identity(wallet, user.getName())
.networkConfig([PATH TO CONNECTION PROFILE]);
try(Gateway gateway = builder.connect()) {
Network channel = gateway.getNetwork(CHANNEL_NAME);
Contract someChaincode = channel.getContract(CHAINCODE_NAME);
int coresNumber = (Runtime.getRuntime().availableProcessors());
ExecutorService executor = Executors.newFixedThreadPool(coresNumber);
for(String elemt : elements) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try{
//Exception thrown here
byte[] res = someChaincode.submitTransaction("someFunction", elemt);
return new String(res);
} catch (ContractException e) {
e.printStackTrace();
}
}, executor);
}
} catch (Exception e) {
// Handle Exception
}
这里是个例外:
java.util.concurrent.ExecutionException: org.hyperledger.fabric.gateway.GatewayRuntimeException: org.hyperledger.fabric.sdk.exception.InvalidArgumentException: Channel [CHANNEL NAME] has been shutdown.
准确的说,异常是在方法checkChannelState()
中抛出的。我感觉我没有正确处理多线程。
您看起来并没有在等待您在代码段中创建的期货的完成。因此,您正在安排事务调用以在不同线程上执行,但随后,在执行此代码之前,退出了 try-with-resources 块,该块关闭了您用于连接的 Gateway 实例。关闭网关会导致所有关联的资源和连接关闭,包括底层通道。因此,当您的事务调用实际获得 运行 时,您已经关闭了它们执行所需的连接和资源。
您需要在关闭Gateway 实例之前从您创建的Future 对象中获取结果;换句话说,在退出创建网关的 try-with-resources 块之前。大概是这样的:
Collection<Callable<String>> tasks = elements.stream()
.map(element -> new Callable<String>() {
public String call() throws ContractException, TimeoutException, InterruptedException {
byte[] result = contract.submitTransaction("someFunction", element);
return new String(result);
}
}).collect(Collectors.toList());
try {
Collection<String> results = new ArrayList<>();
Collection<Future<String>> futures = executor.invokeAll(tasks, timeout, timeUnit);
for (Future<String> future : futures) {
try {
String result = future.get(timeout, timeUnit);
results.add(result);
} catch (CancellationException | InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
}
System.out.println("Results: " + results);
} catch (InterruptedException e ) {
e.printStackTrace();
}