将 Kotlin Lambda 传递给 Corda Flow 时的 Quasar NPE
Quasar NPE when passing a Kotlin Lambda to a Corda Flow
将 lambda 传递给 CollectSignaturesFlow
Quasar 无提示地崩溃并且无法继续处理。问题出在 others.map { initiateFlow(it as Party) }.toSet()
。将其移到外部并将该集合建立为局部变量可解决问题。
关于什么可以和不能用 Quasar 序列化是否有一组已知问题?
@InitiatingFlow
@StartableByRPC
class IOUIssueFlow(val state: IOUState) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val txCommand = Command(
IOUContract.Commands.Issue(),
state.participants.map { it.owningKey })
val builder = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first())
.withItems(
StateAndContract(
state,
IOUContract.IOU_CONTRACT_ID),
txCommand)
builder.verify(serviceHub)
val others = state.participants - ourIdentity
return subFlow(
FinalityFlow(
subFlow(
CollectSignaturesFlow(
serviceHub.signInitialTransaction(builder),
others.map { initiateFlow(it as Party) }.toSet()))))
}
}
该代码将 运行 无限期地没有结束并产生此警告:
[WARN ] 16:38:13,655 [Mock network] (FlowStateMachineImpl.kt:127) flow.[49f2ebbd-be62-462d-af87-24f5ae40c7d6].run - Terminated by unexpected exception
java.lang.NullPointerException: null
at net.corda.node.services.statemachine.FlowStateMachineImpl.suspend(FlowStateMachineImpl.kt:468) ~[corda-node-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.sendInternal(FlowStateMachineImpl.kt:332) ~[corda-node-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.initiateSession(FlowStateMachineImpl.kt:396) ~[corda-node-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.sendAndReceive(FlowStateMachineImpl.kt:200) ~[corda-node-1.0.0.jar:?]
at net.corda.core.internal.FlowStateMachine$DefaultImpls.sendAndReceive$default(FlowStateMachine.kt:27) ~[corda-core-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowSessionImpl.sendAndReceive(FlowSessionImpl.kt:25) ~[corda-node-1.0.0.jar:?]
at net.corda.core.flows.DataVendingFlow.sendPayloadAndReceiveDataRequest(SendTransactionFlow.kt:70) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.DataVendingFlow.call(SendTransactionFlow.kt:48) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.DataVendingFlow.call(SendTransactionFlow.kt:31) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:212) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.CollectSignatureFlow.call(CollectSignaturesFlow.kt:140) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.CollectSignatureFlow.call(CollectSignaturesFlow.kt:134) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:212) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.CollectSignaturesFlow.call(CollectSignaturesFlow.kt:113) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.CollectSignaturesFlow.call(CollectSignaturesFlow.kt:64) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:212) ~[corda-core-1.0.0.jar:?]
at net.corda.training.flow.IOUIssueFlow.call(IOUIssueFlow.kt:39) ~[classes/:?]
at net.corda.training.flow.IOUIssueFlow.call(IOUIssueFlow.kt:22) ~[classes/:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.run(FlowStateMachineImpl.kt:112) [corda-node-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.run(FlowStateMachineImpl.kt:40) [corda-node-1.0.0.jar:?]
at co.paralleluniverse.fibers.Fiber.run1(Fiber.java:1092) [quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.fibers.Fiber.exec(Fiber.java:788) [quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.fibers.RunnableFiberTask.doExec(RunnableFiberTask.java:100) [quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.fibers.RunnableFiberTask.run(RunnableFiberTask.java:91) [quasar-core-0.7.9-jdk8.jar:0.7.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
at net.corda.node.utilities.AffinityExecutor$ServiceAffinityExecutor$thread.run(AffinityExecutor.kt:69) [corda-node-1.0.0.jar:?]
[WARN ] 16:38:13,670 [Mock network] (StateMachineManager.kt:93) flow.[49f2ebbd-be62-462d-af87-24f5ae40c7d6].uncaughtException - Caught exception from flow
java.lang.IllegalStateException: No transaction in context
.
问题是 lambda 捕获了对整个堆栈的引用。如果此时尝试暂停流程,它将尝试通过 lambda 序列化整个堆栈,流程暂停将失败。
解决办法是创建一个局部变量,或者在函数内部使用lambda。然后lambda会在函数returns时从栈中丢弃,所以在挂起时不会有问题。
对我来说,同样的问题是由于缺少 @Suspendable
调用链中函数的注释引起的。
猜猜 lambda 函数可能也是如此。
将 lambda 传递给 CollectSignaturesFlow
Quasar 无提示地崩溃并且无法继续处理。问题出在 others.map { initiateFlow(it as Party) }.toSet()
。将其移到外部并将该集合建立为局部变量可解决问题。
关于什么可以和不能用 Quasar 序列化是否有一组已知问题?
@InitiatingFlow
@StartableByRPC
class IOUIssueFlow(val state: IOUState) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val txCommand = Command(
IOUContract.Commands.Issue(),
state.participants.map { it.owningKey })
val builder = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first())
.withItems(
StateAndContract(
state,
IOUContract.IOU_CONTRACT_ID),
txCommand)
builder.verify(serviceHub)
val others = state.participants - ourIdentity
return subFlow(
FinalityFlow(
subFlow(
CollectSignaturesFlow(
serviceHub.signInitialTransaction(builder),
others.map { initiateFlow(it as Party) }.toSet()))))
}
}
该代码将 运行 无限期地没有结束并产生此警告:
[WARN ] 16:38:13,655 [Mock network] (FlowStateMachineImpl.kt:127) flow.[49f2ebbd-be62-462d-af87-24f5ae40c7d6].run - Terminated by unexpected exception
java.lang.NullPointerException: null
at net.corda.node.services.statemachine.FlowStateMachineImpl.suspend(FlowStateMachineImpl.kt:468) ~[corda-node-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.sendInternal(FlowStateMachineImpl.kt:332) ~[corda-node-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.initiateSession(FlowStateMachineImpl.kt:396) ~[corda-node-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.sendAndReceive(FlowStateMachineImpl.kt:200) ~[corda-node-1.0.0.jar:?]
at net.corda.core.internal.FlowStateMachine$DefaultImpls.sendAndReceive$default(FlowStateMachine.kt:27) ~[corda-core-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowSessionImpl.sendAndReceive(FlowSessionImpl.kt:25) ~[corda-node-1.0.0.jar:?]
at net.corda.core.flows.DataVendingFlow.sendPayloadAndReceiveDataRequest(SendTransactionFlow.kt:70) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.DataVendingFlow.call(SendTransactionFlow.kt:48) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.DataVendingFlow.call(SendTransactionFlow.kt:31) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:212) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.CollectSignatureFlow.call(CollectSignaturesFlow.kt:140) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.CollectSignatureFlow.call(CollectSignaturesFlow.kt:134) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:212) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.CollectSignaturesFlow.call(CollectSignaturesFlow.kt:113) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.CollectSignaturesFlow.call(CollectSignaturesFlow.kt:64) ~[corda-core-1.0.0.jar:?]
at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:212) ~[corda-core-1.0.0.jar:?]
at net.corda.training.flow.IOUIssueFlow.call(IOUIssueFlow.kt:39) ~[classes/:?]
at net.corda.training.flow.IOUIssueFlow.call(IOUIssueFlow.kt:22) ~[classes/:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.run(FlowStateMachineImpl.kt:112) [corda-node-1.0.0.jar:?]
at net.corda.node.services.statemachine.FlowStateMachineImpl.run(FlowStateMachineImpl.kt:40) [corda-node-1.0.0.jar:?]
at co.paralleluniverse.fibers.Fiber.run1(Fiber.java:1092) [quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.fibers.Fiber.exec(Fiber.java:788) [quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.fibers.RunnableFiberTask.doExec(RunnableFiberTask.java:100) [quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.fibers.RunnableFiberTask.run(RunnableFiberTask.java:91) [quasar-core-0.7.9-jdk8.jar:0.7.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_151]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
at net.corda.node.utilities.AffinityExecutor$ServiceAffinityExecutor$thread.run(AffinityExecutor.kt:69) [corda-node-1.0.0.jar:?]
[WARN ] 16:38:13,670 [Mock network] (StateMachineManager.kt:93) flow.[49f2ebbd-be62-462d-af87-24f5ae40c7d6].uncaughtException - Caught exception from flow
java.lang.IllegalStateException: No transaction in context
.
问题是 lambda 捕获了对整个堆栈的引用。如果此时尝试暂停流程,它将尝试通过 lambda 序列化整个堆栈,流程暂停将失败。
解决办法是创建一个局部变量,或者在函数内部使用lambda。然后lambda会在函数returns时从栈中丢弃,所以在挂起时不会有问题。
对我来说,同样的问题是由于缺少 @Suspendable
调用链中函数的注释引起的。
猜猜 lambda 函数可能也是如此。