如何跟踪k8s集群中创建的有状态pod?

vql8enpb  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(538)

设置
我采用以下方式设置了k8s群集:
1主节点
2个工作节点
集群是使用kubeadm和flannel设置的。
我有两种不同的豆荚类型:
java代理服务器
java tcp服务器
java代理服务器pod是我最初创建的statefulset。每个java代理服务器都有自己的状态(当前连接的客户机),但是它们都应该共享一个公共状态。
这个公共状态是javatcp服务器pod及其相关ip地址的最新列表。我在这里的目标是确保每个代理服务器都有一个可以代理连接的tcp服务器的当前列表。
javatcp服务器的每个示例都有自己独特的状态,并且还作为statefulset部署。tcp服务器pod之间唯一的共同点是它们可以从代理服务器接收连接。
代理服务器必须知道tcp服务器pod何时出现或出现故障,以便知道哪些pod可用于代理连接。
tcp服务器由代理服务器委派连接。代理服务器随机给tcp服务器一个连接,并且它们没有负载平衡,这种情况永远不会发生。
尝试
我尝试使用java kubernetes客户端,并在代理服务器上实现了一个监视,如下所示:

ApiClient apiClient = Config.defaultClient();
apiClient.setReadTimeout(0);
System.out.println(apiClient.getBasePath());
Configuration.setDefaultApiClient(apiClient);

CoreV1Api api = new CoreV1Api();
V1PodList pods = api.listPodForAllNamespaces(null, null, null, null, null, null, null, null, null);
V1ListMeta podsMeta = pods.getMetadata();
if (podsMeta != null) {
    String resourceVersion = podsMeta.getResourceVersion();

    Watch<V1Pod> watch = Watch.createWatch(
            apiClient,
            api.listPodForAllNamespacesCall(null, null, null, null, null, null, resourceVersion, null, true, null),
            new TypeToken<Watch.Response<V1Pod>>(){}.getType());

    while (watch.hasNext()) {
        Watch.Response<V1Pod> response = watch.next();
        V1Pod pod = response.object;
        V1PodStatus status = pod.getStatus();
        if (status != null) {
            System.out.printf("Pod IP: %s\n", status.getPodIP());
            System.out.printf("Pod Reason: %s\n", status.getReason());
        }
    }

    watch.close();
}

这样做效果相对较好。对我来说最大的问题是,对于这个简单的过程,它为我的最终jar文件添加了一个巨大的40mb。
我知道40mb对某些人来说可能并不多。我只是觉得有一种更轻量级的方法来实现我想做的事情?
有没有更好的方法来追踪这些在我所忽略的集群中创建和销毁的豆荚?

dohp0rv5

dohp0rv51#

我已经想出了我自己的解决方案(现在),虽然不是很漂亮。我决定使用侧车模式,并将另一个容器与我的服务器代理吊舱一起装运。它是用go编写的,二进制文件被剥离并在alpinelinux上运行。
现在,我只是使用一个到java代理服务器的简单udp连接,让它知道pod何时被删除或添加。
我引用了kubernetes文档,这些文档描述了如何使用kubeapiserver服务。就我而言,我只是使用默认的服务帐户。
我将随附一些示例代码以给出实现的想法。基本上,所有api凭证都是由kubernetes通过文件提供的。
/var/run/secrets/kubernetes.io/serviceaccount/token /var/run/secrets/kubernetes.io/serviceaccount/ca.crt 然后,我们可以使用这些凭据访问kubernetes通过默认dns配置提供的api。 https://kubernetes.default.svc/api/v1/ 下面是我如何在集群中跟踪豆荚的要点:

package main

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
)

func main() {
    backGroundContext := context.Background()

    accessTokenData, accessTokenFileError := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
    if accessTokenFileError != nil {
        log.Fatalln(accessTokenFileError)
    }

    accessToken := string(accessTokenData)

    k8sCertificate, certificateFileError := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
    if certificateFileError != nil {
        log.Fatalln(certificateFileError)
    }

    certificateAuthorityPool := x509.NewCertPool()
    certificateAuthorityPool.AppendCertsFromPEM(k8sCertificate)

    client := &http.Client{
        Transport: &http.Transport{
            TLSClientConfig: &tls.Config{
                RootCAs: certificateAuthorityPool,
            },
        },
        Timeout: 0, // disable timeout for the watch request
    }

    request, requestError := http.NewRequestWithContext(backGroundContext, "GET", "https://kubernetes.default.svc/api/v1/namespaces/default/pods", nil)
    if requestError != nil {
        log.Fatalln(requestError)
    }
    request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", accessToken))

    response, responseError := client.Do(request)
    if responseError != nil {
        log.Fatalln(responseError)
    }
    defer response.Body.Close()

    type PodListMetaData struct {
        ResourceVersion string `json:"resourceVersion"`
    }

    type PodStatus struct {
        IP string `json:"podIP"`
    }

    type PodMetaData struct {
        Name string `json:"name"`
    }

    type PodResult struct {
        MetaData PodMetaData `json:"metadata"`
        Status   PodStatus   `json:"status"`
    }

    type PodListResult struct {
        MetaData PodListMetaData `json:"metadata"`
        Items    []PodResult     `json:"items"`
    }

    var list PodListResult

    decoder := json.NewDecoder(response.Body)
    decodeError := decoder.Decode(&list)
    if decodeError != nil {
        log.Fatalln(decodeError)
    }

    resourceVersion := list.MetaData.ResourceVersion
    log.Printf("Resource Version: %s\n", resourceVersion)

    for _, item := range list.Items {
        log.Printf("Found Pod: %s with IP of %s\n", item.MetaData.Name, item.Status.IP)
    }

    watchRequest, watchRequestError := http.NewRequestWithContext(backGroundContext, "GET", fmt.Sprintf("https://kubernetes.default.svc/api/v1/namespaces/default/pods?watch=1&resourceVersion=%s&allowWatchBookmarks=true", resourceVersion), nil)
    if watchRequestError != nil {
        log.Fatalln(watchRequestError)
    }
    watchRequest.Header.Add("Authorization", fmt.Sprintf("Bearer %s", accessToken))

    response1, response1Error := client.Do(watchRequest)
    if response1Error != nil {
        log.Fatalln(response1Error)
    }
    defer response1.Body.Close()

    type PodListWatchResult struct {
        Type   string    `json:"type"`
        Object PodResult `json:"object"`
    }

    decoder1 := json.NewDecoder(response1.Body)

    for decoder1.More() {
        var podResult PodListWatchResult
        decodeError1 := decoder1.Decode(&podResult)
        if decodeError1 != nil {
            log.Fatalln(decodeError1)
        }

        log.Printf("Found Pod: %s with IP of %s\n", podResult.Object.MetaData.Name, podResult.Object.Status.IP)
    }
}

这里完全有改进的余地,但这是我解决方案的要点。每次流式传输一个新的json对象时,我都会对它进行处理,以确保它是我感兴趣的对象,然后通过udp将它转发到java代理服务器。
在阿尔卑斯山上构建的二进制文件大约有10MB,非常轻!
我怀疑对于“真正”的解决方案,我唯一的选择是使用正式的kubernetes go客户机或java客户机。它将使我的容器更大,但我的解决方案没有太多的保证,充其量看起来像是一个黑客。
我仍然希望有一些我忽略了的东西可以简化这一切,并且不需要大的客户端库。

相关问题