我发现了 CollectSignaturesFlow
产生 UnsupportedOperationException
因为在某个时刻,krio无法序列化流中使用的某个集合。其中使用的p2p collectsignatureflow工作正常,没有中断。我还以更简单的方式实现了收集签名(是的,没有那么多检查,但它只是为了测试目的),它完成了这项工作。
代码:
@InitiatingFlow
@StartableByRPC
class CheckoutClientFlow(
private val person: Person = ClientAccountFlows.defaultPerson,
private val partiesAndAmounts: Map<CordaX500Name, Long>
) : FlowLogic<SignedTransaction>() {
override val progressTracker: ProgressTracker = tracker()
private var selfIncluded = false
@Suspendable
override fun call(): SignedTransaction {
progressTracker.currentStep = NotaryInfo
val notary: Party = getPreferredNotary(serviceHub)
progressTracker.currentStep = LocateOtherParties
val parties = locateParties(partiesAndAmounts.keys).toMutableSet()
if (parties.contains(ourIdentity)) {
selfIncluded = true
parties.remove(ourIdentity)
}
val inputs = subFlow(GatherStateAndRefFlow(person, parties))
val otherPartiesSessions = parties.map { initiateFlow(it) }
progressTracker.currentStep = CreateTransaction
val inputsWithParties = inputs.mapKeys { entry -> locateParty(entry.key) }
val txBuilder = createTxBuilder(notary, inputsWithParties)
var onceSignedTransaction = serviceHub.signInitialTransaction(txBuilder)
progressTracker.currentStep = SigningByOtherParty
val signedTx = subFlow(CollectSignaturesFlow(onceSignedTransaction, otherPartiesSessions))
// otherPartiesSessions.forEach {
// onceSignedTransaction += subFlow(CollectSignatureFlow(onceSignedTransaction, it))
// }
progressTracker.currentStep = Finalize
return subFlow(FinalityFlow(transaction = onceSignedTransaction, sessions = otherPartiesSessions))
}
}
}
测试:
@Test
fun `subtract if both parties are present other`() {
val person = defaultPerson
val partiesAndAmounts = mapOf(Pair(aNodeName, defaultMovableAmount), Pair(bNodeName, defaultMovableAmount))
val resultFuture = a.startFlow(ExchangeValueMultipleParties.CheckoutClientFlow(
person = defaultPerson,
partiesAndAmounts = partiesAndAmounts
))
network.runNetwork()
val res1 = resultFuture.get()
assertNotNull(res1)
assertNotEquals(0, res1.inputs.size)
assertEquals(partiesAndAmounts.size, res1.tx.outputs.size)
assertEquals(partiesAndAmounts.size, res1.sigs.size)
val stateA = a.services.vaultService.queryBy<ClientAccountState>(accountQueryCriteria).states.first { it.state.data.owner == person }
assertNotNull(stateA)
// assertEquals(defaultAmount - defaultMovableAmount, getTrueAmount(stateA.state.data.balance))
val stateB = b.services.vaultService.queryBy<ClientAccountState>(accountQueryCriteria).states.first { it.state.data.owner == person }
assertNotNull(stateB)
// assertEquals(defaultAmount - defaultMovableAmount, getTrueAmount(stateB.state.data.balance))
}
堆栈跟踪:
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
dataObject (co.paralleluniverse.fibers.Stack)
stack (net.corda.node.services.statemachine.FlowStateMachineImpl)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer.read(CompatibleFieldSerializer.java:145)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:782)
at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObjectOrNull(ReplaceableObjectKryo.java:107)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2156)
at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2086)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readClassAndObject(ReplaceableObjectKryo.java:112)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:83)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:33)
at net.corda.node.serialization.kryo.KryoStreams.kryoInput(KryoStreams.kt:20)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:72)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:33)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$kryo$1.execute(KryoCheckpointSerializer.kt:61)
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer.kryo(KryoCheckpointSerializer.kt:57)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.kt:71)
at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:101)
at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:51)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Caused by: java.lang.UnsupportedOperationException
at java.util.AbstractCollection.add(AbstractCollection.java:262)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 21 more
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
dataObject (co.paralleluniverse.fibers.Stack)
stack (net.corda.node.services.statemachine.FlowStateMachineImpl)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer.read(CompatibleFieldSerializer.java:145)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:782)
at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObjectOrNull(ReplaceableObjectKryo.java:107)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2156)
at co.paralleluniverse.fibers.Fiber$FiberSerializer.read(Fiber.java:2086)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readClassAndObject(ReplaceableObjectKryo.java:112)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:83)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1$1.invoke(KryoCheckpointSerializer.kt:33)
at net.corda.node.serialization.kryo.KryoStreams.kryoInput(KryoStreams.kt:20)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:72)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$deserialize$1.invoke(KryoCheckpointSerializer.kt:33)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer$kryo$1.execute(KryoCheckpointSerializer.kt:61)
at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:58)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer.kryo(KryoCheckpointSerializer.kt:57)
at net.corda.node.serialization.kryo.KryoCheckpointSerializer.deserialize(KryoCheckpointSerializer.kt:71)
at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:101)
at net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker$start$2.invoke(FiberDeserializationCheckingInterceptor.kt:51)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Caused by: java.lang.UnsupportedOperationException
at java.util.AbstractCollection.add(AbstractCollection.java:262)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at co.paralleluniverse.io.serialization.kryo.ReplaceableObjectKryo.readObject(ReplaceableObjectKryo.java:92)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 21 more
[ERROR] 19:03:34+0000 [FiberDeserializationChecker] interceptors.FiberDeserializationChecker. - Encountered unrestorable checkpoint! [errorCode=aixjez, moreInformationAt=https://errors.corda.net/OS/4.4/aixjez]
似乎kryo试图调用abstractcollection不支持的add()方法,以重建流中使用的某个集合,尽管该方法已实现。
下面是abstractcollection中的实现。
public boolean add(E var1) {
throw new UnsupportedOperationException();
}
此外,在运行调试时,可能会观察到线程不足:
[WARN] 19:55:47,111 [HikariPool-3 housekeeper] pool.HikariPool. - HikariPool-3 - Thread starvation or clock leap detected (housekeeper delta=1m35s902ms765µs600ns).
[WARN] 19:55:47,111 [HikariPool-1 housekeeper] pool.HikariPool. - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m49s546ms362µs501ns).
[WARN] 19:55:47,113 [HikariPool-2 housekeeper] pool.HikariPool. - HikariPool-2 - Thread starvation or clock leap detected (housekeeper delta=1m37s789ms188µs200ns).
[WARN] 19:55:47,114 [HikariPool-4 housekeeper] pool.HikariPool. - HikariPool-4 - Thread starvation or clock leap detected (housekeeper delta=1m33s803ms330µs901ns).
暂无答案!
目前还没有任何答案,快来回答吧!