文章40 | 阅读 30614 | 点赞0
本文主要分享 Collector Naming Server 命名服务。主要包含如下部分:
Collector Agent Server ( 包括 Jetty 和 gRPC ),提供上传调用链路,JVM Metric 等等 API 给 Agent 调用。
Agent 通过 Collector Naming Server 调用 Collector Agent Server 的 API ,查询 Collector Agent Server 最新的集群地址。
Naming Server 在 SkyWalking 架构图处于如下位置( 红框 ) :
FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构,如下图所示 :
Collector Naming Server 通过 apm-collector-naming
项目实现,其中:
collector-naming-define
项目:定义了 Naming Server 的接口。collector-naming-jetty-provider
项目:基于 Jetty Server 的 Naming Server 实现。org.skywalking.apm.collector.cluster.ClusterModule
,实现 Module 抽象类,集群管理 Module 。
#name()
实现方法,返回模块名为 "naming"
。
#services()
实现方法,返回 Service 类名:NamingHandlerRegisterService 。
org.skywalking.apm.collector.naming.jetty.NamingModuleJettyProvider
,实现 ModuleProvider 抽象类,基于 Jetty 的命名组件服务提供者实现类。
#name()
实现方法,返回组件服务提供者名为 "jetty"
。
module()
实现方法,返回组件类为 NamingModule 。
#requiredModules()
实现方法,返回依赖组件为 ClusterModule 、JettyManagerModule。
#prepare(Properties)
实现方法,执行准备阶段逻辑。
#registerServiceImplementation()
父类方法,注册到 services
。#start()
实现方法,执行启动阶段逻辑。
JettyManagerService#createIfAbsent(host, port, contextPath)
方法,创建 Jetty Server ,此时不会启动 Jetty Server。在 JettyManagerProvider#notifyAfterCompleted()
方法,统一启动所有 Jetty Server,在 《SkyWalking 源码分析 —— Collector Jetty Server Manager》「3. JettyManagerProvider」 有详细解析。#notifyAfterCompleted()
实现方法,执行启动完成逻辑。目前是个空方法。
org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService
,继承 Service 接口,命名处理器注册服务接口。
#register(ServerHandler)
接口方法,注册 Server 请求处理器。Collector Agent Server 会调用该方法,将其实现的 用于 Naming 的 ServerHandler 进行注册。如下图所示:
org.skywalking.apm.collector.naming.jetty.service.service.NamingJettyHandlerRegisterService
,基于 Jetty 的命名处理器注册服务实现类。
#register(moduleName, providerName, registration)
实现方法,调用 JettyManagerService#addHandler(path, registration)
方法,注册 Jetty Server 请求处理器。
org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler
,实现 JettyHandler 抽象类,Collector Agent Jetty Server 实现的命名处理器。
#pathSpec()
实现方法,获得请求路径为 "/agent/jetty"
。
#doGet()
实现方法,调用 AgentJettyNamingListener#getAddresses()
方法,获得 Collector Agent Jetty Server 集群地址。
org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener
基于 Collector Cluster 组件,实现了集群地址变化的发现,在 《SkyWalking 源码分析 —— Collector Cluster 集群管理》 有详细解析。org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHandler
,实现 JettyHandler 抽象类,Collector Agent gRPC Server 实现的命名处理器。
#pathSpec()
实现方法,获得请求路径为 "/agent/gRPC"
。
#doGet()
实现方法,调用 AgentGRPCNamingListener#getAddresses()
方法,获得 Collector Agent gRPC Server 集群地址。
org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingListener
基于 Collector Cluster 组件,实现了集群地址变化的发现,在 《SkyWalking 源码分析 —— Collector Cluster 集群管理》 有详细解析。配置文件如下 :
org.skywalking.apm.agent.core.remote.CollectorDiscoveryService
, 实现 Agent 的 BootService 接口,Collector Agent Server 地址发现服务。
#boot()
实现方法,调用 ScheduledExecutorService#scheduleAtFixedRate(...)
方法,创建定时任务。该定时任务无初始化延迟,每 Config.Collector.DISCOVERY_CHECK_INTERVAL
( 默认:60 s ) 执行一次。
java.lang.Runnable
接口,即创建的任务。org.skywalking.apm.agent.core.remote.CollectorDiscoveryService
,实现 java.lang.Runnable
接口,Collector 服务发现客户端,基于 Rest 方式通信。
构造方法 ,首先随机选择一个 Collector Naming Server ,用于下面 #findServerList()
方法,首次获取 Collector Agent Server 集群地址。
#run()
实现方法,调用 #findServerList()
方法,获取 Collector Agent Server 集群地址。
#findServerList()
方法,获取 Collector Agent Server 集群地址。
org.apache.http.impl.client.CloseableHttpClient
对象。目前使用 HttpClient 4.5.3
版本。#buildGet()
方法,创建 org.apache.http.client.methods.HttpGet
对象。目前 Agent 查询的是 Collector Agent gRPC Server 集群地址,因为 gRPC 的性能相比 HTTP 更优秀。200
时,调用 #findBackupServer()
方法,顺序选择 Collector Naming Server 列表的下一个。注意,此时不会再发起请求,需要等下一次执行。RemoteDownstreamConfig.Collector.GRPC_SERVERS
。#findBackupServer()
方法,顺序选择 Collector Naming Server 列表的下一个。CloseableHttpClient#close()
方法,进行关闭。配置文件如下 :
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_42073629/article/details/119702851
内容来源于网络,如有侵权,请联系作者删除!