Spring Boot 种子节点无法加入akka群集

cvxl0en2  于 11个月前  发布在  Spring
关注(0)|答案(1)|浏览(114)

我尝试为我的akka类型的actorsystem引导一个akka集群,但由于某种原因,种子节点无法加入集群。
第一个月
我尝试在我的OCR管道中使用akka的演员模型。我有三个演员和一个监护人演员,都在演员系统中定义。下面是我的演员系统。

package io.tajji.kycpipeline.actor

import akka.actor.typed.ActorSystem
import akka.actor.typed.javadsl.Behaviors
import akka.cluster.typed.Cluster
import io.tajji.apis.kyc.events.*
import io.tajji.kycpipeline.model.Command.*
import io.tajji.kycpipeline.model.Message
import io.tajji.kycpipeline.model.Message.*
import io.tajji.kycpipeline.service.KYCValidationService
import io.tajji.kycpipeline.service.TextExtractionService
import io.tajji.kycpipeline.service.TextParsingService
import org.axonframework.eventhandling.EventBus
import org.axonframework.eventhandling.GenericEventMessage
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class KYCSystem(
    private val eventBus: EventBus,
    private val textExtractor: TextExtractionService,
    private val detailsParser: TextParsingService,
    private val kycValidator: KYCValidationService
) {
    private val system: ActorSystem<Message> = ActorSystem.create(
        Behaviors.setup<Message> { context ->
            val validator =
                context.spawn(KYCValidator.create(kycValidator),
                    "DocumentValidatorActor")
            val documentExtractorActor =
                context.spawn(DocumentExtractor.create(textExtractor),
                "TextExtractorActor")
            val detailsParserActor =
                context.spawn(DocumentParser.create(detailsParser),
                    "DocumentParserActor")
            Behaviors.receiveMessage { message ->
                when(message) {

                    is LandlordKYC -> {
                        val validateLandlordDocuments = ValidateLandlordDocuments(
                            message.event.accountId,
                            message.event,
                            context.self
                        )
                        validator.tell(validateLandlordDocuments)
                    }
                    is ResidentKYC -> {
                        val validateResidentDocuments = ValidateResidentDocuments(
                            message.event.accountId,
                            message.event,
                            context.self
                        )
                        validator.tell(validateResidentDocuments)
                    }
                    is ValidLandlordDocuments -> {
                        val extractLandlordDetails = ExtractLandlordDetails(
                            message.accountId,
                            message.idFront,
                            message.idBack,
                            message.pinCertificateData,
                            context.self
                        )
                        documentExtractorActor.tell(extractLandlordDetails)
                    }
                    is ValidResidentDocuments -> {
                        val extractResidentDetails = ExtractResidentDetails(
                            message.accountId,
                            message.idFront,
                            message.idBack,
                            context.self
                        )
                        documentExtractorActor.tell(extractResidentDetails)
                    }
                    is ExtractedLandlordDetails -> {
                        val parseLandlordDetails = ParseLandlordDetails(
                            message.accountId,
                            message,
                            context.self
                        )
                        detailsParserActor.tell(parseLandlordDetails)
                    }
                    is ExtractedResidentDetails -> {
                        val parseResidentDetails = ParseResidentDetails(
                            message.accountId,
                            message,
                            context.self
                        )
                        detailsParserActor.tell(parseResidentDetails)
                    }
                    is NationalIdInvalid -> {
                        eventBus.publish(GenericEventMessage
                            .asEventMessage<KYCFailed>(KYCFailed(
                            message.accountId,
                            message.reason
                            )))
                    }
                    is PinCertificateInvalid -> {
                        eventBus.publish(GenericEventMessage
                            .asEventMessage<KYCFailed>(KYCFailed(
                                message.accountId,
                                message.reason
                            )))
                    }
                    is ParsedResidentDetails -> {
                        message.nationalIDData.subscribe { data ->
                            eventBus.publish(GenericEventMessage
                                .asEventMessage<ResidentKYCPassed>(ResidentKYCPassed(
                                    message.accountId,
                                    data
                                )))
                        }
                    }
                    is ParsedLandlordDetails -> {
                        val accountId = message.accountId
                        Mono.zip(
                            message.nationalIDData,
                            message.taxData
                        ).subscribe { tuple ->
                            val nationalId = tuple.t1
                            val taxData = tuple.t2
                            eventBus.publish(GenericEventMessage
                                .asEventMessage<LandlordKYCPassed>(LandlordKYCPassed(
                                    accountId,
                                    nationalId,
                                    taxData
                                )))
                        }
                    }

                    is Invalid -> {
                        eventBus.publish(GenericEventMessage
                            .asEventMessage<KYCFailed>(KYCFailed(
                                message.accountId,
                                message.failureReason
                            )))
                    }
                }
                Behaviors.same()
            }
        }, "KYCSystem"
    )

    private val kycCluster = Cluster.get(system)
    fun processLandlordKYC(event: LandlordKYCRequested) {
        system.tell(LandlordKYC(event))
    }
    fun processResidentKYC(event: ResidentKYCRequested) {
        system.tell(ResidentKYC(event))
    }

}

字符串
下面是我的集群配置文件

akka {
  cluster {
    seed-nodes = ["akka://[email protected]:25520"]
    shutdown-after-unsuccessful-join-seed-nodes =  20s
    seed-node-timeout = 15s
    log-info-verbose = off
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
  actor {
    provider = cluster
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
      port = 25520
    }
    artery {
    }
  }
  coordinated-shutdown {
    exit-jvm = on
  }
}

xj3cbfub

xj3cbfub1#

根据给出的错误日志,

Cluster Node [akka://[email protected]:25520] - Joining of seed-nodes [akka://[email protected]:25520]

字符串
这里是你启动的ActorSystemakka://email protected(https://stackoverflow.com/cdn-cgi/l/email-protection):25520
所以你必须在种子节点配置中配置相同的配置。并且主机和端口必须匹配。

akka {
  cluster {
    seed-nodes = ["akka://[email protected]:25520"]
    shutdown-after-unsuccessful-join-seed-nodes =  20s
    seed-node-timeout = 15s
    log-info-verbose = off
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
  actor {
    provider = cluster
  }
  remote {
    netty.tcp {
      hostname = "127.0.1.1"
      port = 25520
    }
  }
  coordinated-shutdown {
    exit-jvm = on
  }
}

相关问题