Hyperledger Fabric:使用 java-gateway-sdk 的异步/并行事务

Hyperledger Fabric: Async / parallel transaction using java-gateway-sdk

我正在尝试使用 java 网关 sdk 将异步事务发送到我的 Fabric 网络,但我收到错误 Channel [CHANNEL NAME] has been shutdown

这里有一些示例代码:

    Gateway.Builder builder = Gateway.createBuilder()
              .discovery(true)
              .identity(wallet, user.getName())
              .networkConfig([PATH TO CONNECTION PROFILE]);

    try(Gateway gateway = builder.connect()) {
        Network channel = gateway.getNetwork(CHANNEL_NAME);
        Contract someChaincode = channel.getContract(CHAINCODE_NAME);

        int coresNumber = (Runtime.getRuntime().availableProcessors());

        ExecutorService executor = Executors.newFixedThreadPool(coresNumber);

        for(String elemt : elements) {                                                                          
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                try{
                    //Exception thrown here
                    byte[] res = someChaincode.submitTransaction("someFunction", elemt);
                    return new String(res);     
                } catch (ContractException e) {
                    e.printStackTrace();
                }
            }, executor);
        }
    } catch (Exception e) {
       // Handle Exception
    }

这里是个例外:

java.util.concurrent.ExecutionException: org.hyperledger.fabric.gateway.GatewayRuntimeException: org.hyperledger.fabric.sdk.exception.InvalidArgumentException: Channel [CHANNEL NAME] has been shutdown.

准确的说,异常是在方法checkChannelState()中抛出的。我感觉我没有正确处理多线程。

您看起来并没有在等待您在代码段中创建的期货的完成。因此,您正在安排事务调用以在不同线程上执行,但随后,在执行此代码之前,退出了 try-with-resources 块,该块关闭了您用于连接的 Gateway 实例。关闭网关会导致所有关联的资源和连接关闭,包括底层通道。因此,当您的事务调用实际获得 运行 时,您已经关闭了它们执行所需的连接和资源。

您需要在关闭Gateway 实例之前从您创建的Future 对象中获取结果;换句话说,在退出创建网关的 try-with-resources 块之前。大概是这样的:

Collection<Callable<String>> tasks = elements.stream()
    .map(element -> new Callable<String>() {
        public String call() throws ContractException, TimeoutException, InterruptedException {
            byte[] result = contract.submitTransaction("someFunction", element);
            return new String(result);
        }
    }).collect(Collectors.toList());

try {
    Collection<String> results = new ArrayList<>();

    Collection<Future<String>> futures = executor.invokeAll(tasks, timeout, timeUnit);
    for (Future<String> future : futures) {
        try {
            String result = future.get(timeout, timeUnit);
            results.add(result);
        } catch (CancellationException | InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    System.out.println("Results: " + results);
} catch (InterruptedException e ) {
    e.printStackTrace();
}