我尝试为我的akka类型的actorsystem引导一个akka集群,但由于某种原因,种子节点无法加入集群。
第一个月
我尝试在我的OCR管道中使用akka的演员模型。我有三个演员和一个监护人演员,都在演员系统中定义。下面是我的演员系统。
package io.tajji.kycpipeline.actor
import akka.actor.typed.ActorSystem
import akka.actor.typed.javadsl.Behaviors
import akka.cluster.typed.Cluster
import io.tajji.apis.kyc.events.*
import io.tajji.kycpipeline.model.Command.*
import io.tajji.kycpipeline.model.Message
import io.tajji.kycpipeline.model.Message.*
import io.tajji.kycpipeline.service.KYCValidationService
import io.tajji.kycpipeline.service.TextExtractionService
import io.tajji.kycpipeline.service.TextParsingService
import org.axonframework.eventhandling.EventBus
import org.axonframework.eventhandling.GenericEventMessage
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class KYCSystem(
private val eventBus: EventBus,
private val textExtractor: TextExtractionService,
private val detailsParser: TextParsingService,
private val kycValidator: KYCValidationService
) {
private val system: ActorSystem<Message> = ActorSystem.create(
Behaviors.setup<Message> { context ->
val validator =
context.spawn(KYCValidator.create(kycValidator),
"DocumentValidatorActor")
val documentExtractorActor =
context.spawn(DocumentExtractor.create(textExtractor),
"TextExtractorActor")
val detailsParserActor =
context.spawn(DocumentParser.create(detailsParser),
"DocumentParserActor")
Behaviors.receiveMessage { message ->
when(message) {
is LandlordKYC -> {
val validateLandlordDocuments = ValidateLandlordDocuments(
message.event.accountId,
message.event,
context.self
)
validator.tell(validateLandlordDocuments)
}
is ResidentKYC -> {
val validateResidentDocuments = ValidateResidentDocuments(
message.event.accountId,
message.event,
context.self
)
validator.tell(validateResidentDocuments)
}
is ValidLandlordDocuments -> {
val extractLandlordDetails = ExtractLandlordDetails(
message.accountId,
message.idFront,
message.idBack,
message.pinCertificateData,
context.self
)
documentExtractorActor.tell(extractLandlordDetails)
}
is ValidResidentDocuments -> {
val extractResidentDetails = ExtractResidentDetails(
message.accountId,
message.idFront,
message.idBack,
context.self
)
documentExtractorActor.tell(extractResidentDetails)
}
is ExtractedLandlordDetails -> {
val parseLandlordDetails = ParseLandlordDetails(
message.accountId,
message,
context.self
)
detailsParserActor.tell(parseLandlordDetails)
}
is ExtractedResidentDetails -> {
val parseResidentDetails = ParseResidentDetails(
message.accountId,
message,
context.self
)
detailsParserActor.tell(parseResidentDetails)
}
is NationalIdInvalid -> {
eventBus.publish(GenericEventMessage
.asEventMessage<KYCFailed>(KYCFailed(
message.accountId,
message.reason
)))
}
is PinCertificateInvalid -> {
eventBus.publish(GenericEventMessage
.asEventMessage<KYCFailed>(KYCFailed(
message.accountId,
message.reason
)))
}
is ParsedResidentDetails -> {
message.nationalIDData.subscribe { data ->
eventBus.publish(GenericEventMessage
.asEventMessage<ResidentKYCPassed>(ResidentKYCPassed(
message.accountId,
data
)))
}
}
is ParsedLandlordDetails -> {
val accountId = message.accountId
Mono.zip(
message.nationalIDData,
message.taxData
).subscribe { tuple ->
val nationalId = tuple.t1
val taxData = tuple.t2
eventBus.publish(GenericEventMessage
.asEventMessage<LandlordKYCPassed>(LandlordKYCPassed(
accountId,
nationalId,
taxData
)))
}
}
is Invalid -> {
eventBus.publish(GenericEventMessage
.asEventMessage<KYCFailed>(KYCFailed(
message.accountId,
message.failureReason
)))
}
}
Behaviors.same()
}
}, "KYCSystem"
)
private val kycCluster = Cluster.get(system)
fun processLandlordKYC(event: LandlordKYCRequested) {
system.tell(LandlordKYC(event))
}
fun processResidentKYC(event: ResidentKYCRequested) {
system.tell(ResidentKYC(event))
}
}
字符串
下面是我的集群配置文件
akka {
cluster {
seed-nodes = ["akka://[email protected]:25520"]
shutdown-after-unsuccessful-join-seed-nodes = 20s
seed-node-timeout = 15s
log-info-verbose = off
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
actor {
provider = cluster
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 25520
}
artery {
}
}
coordinated-shutdown {
exit-jvm = on
}
}
型
1条答案
按热度按时间xj3cbfub1#
根据给出的错误日志,
字符串
这里是你启动的ActorSystemakka://email protected(https://stackoverflow.com/cdn-cgi/l/email-protection):25520
所以你必须在种子节点配置中配置相同的配置。并且主机和端口必须匹配。
型