在“dataflowrunner”worker上安装kubectl

ggazkfy8  于 2021-06-15  发布在  ElasticSearch
关注(0)|答案(1)|浏览(373)

我想在运行在kubernetes上的elasticsearch集群中播种数据。我的数据在bigquery上,我想使用dataflow(python)加载数据。python-apache-beam版本似乎没有ElasticSearch接收器。我在dataflow中编写了我自己的elasticsearch writer,但是我需要从kubernetes集群转发我的elasticsearch端口。所以我需要安装googlecloudsdk和kubectl,这样我就可以转发端口写我的数据,然后关闭它回来。当我在本地运行作业时,我的代码似乎工作得很好,但我似乎无法在workers上安装googlecloudsdk和kubectl。
当我在本地运行作业时,我的代码似乎工作得很好,但我似乎无法在workers上安装googlecloudsdk和kubectl。
这些是在subacces.popen的setup.py中调用的命令

['export', 'CLOUD_SDK_REPO="cloud-sdk-$(lsb_release', '-c', '-s)"'],
['echo', '"deb', 'https://packages.cloud.google.com/apt', '$CLOUD_SDK_REPO', 'main"', '|', 'sudo', 'tee', '-a', '/etc/apt/sources.list.d/google-cloud-sdk.list'],
['sudo', 'rm', '/etc/apt/sources.list.d/partner.list'],
['sudo', 'apt-get', 'install', 'google-cloud-sdk', 'kubectl']

这是我在start\u包中端口转发elasticsearch服务的方法

def _open_connection(self):
    tries = 0
    connected = False
    while tries <= 3 and not connected:
        tries += 1
        try:
            res = requests.get('http://{0}:{1}'.format(self.host, self.port))
            connected = (res.status_code == 200)
        except Exception as e:
            logging.warning(e)
            subprocess.check_call('gcloud container clusters get-credentials {0}'.format(ES_CLUSTER_NAME), shell=True)
            try:
                subprocess.check_call('kubectl version', shell=True)
            except exception as ee:
                logging.warning(ee)
                subprocess.check_call('gcloud components install kubectl', shell=True)
            subprocess.call('kubectl port-forward elasticsearch-0 {0}:{0} & disown'.format(self.port), shell=True)
            time.sleep(3)
    return connected

我期望这些命令(我尝试了一些变体)在每个worker上安装所需的包,但是安装一直失败。

t5fffqht

t5fffqht1#

我修复了这个问题,跳过了端口转发,而是在我的elasticsearch端口上实现了一个内部负载均衡器。这样我的数据流工作者就可以直接连接到内部ip来写入数据。

相关问题