如何使用python远程读取和写入hdfs?

bvjveswy  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(823)

hdfs位于远程服务器上( hdfs_server ). 我能做到 ssh user@hdfs_server 使用 cat 以及 put 分别读写,但我被要求不要接触hdfs(除了远程写入文件)。我需要使用本地机器来读取和写入hdfs。
如何使用python实现这一点?我发现这段代码似乎做到了这一点,但我无法在本地计算机上运行它,以便在远程hdfs上读写:

import requests
import json
import os
import kerberos
import sys

node = os.getenv("namenode").split(",")
print (node)

local_file_path = sys.argv[1]
remote_file_path = sys.argv[2]
read_or_write = sys.argv[3]
print (local_file_path,remote_file_path)

def check_node_status(node):
        for name in node:
                print (name)
                request = requests.get("%s/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"%name,
                                                           verify=False).json()
                status = request["beans"][0]["State"]
                print (name)
                print (status)
                if status =="active":
                        break
        return status,name

def kerberos_auth():
        __, krb_context = kerberos.authGSSClientInit("HTTP@hdfs_server")
        kerberos.authGSSClientStep(krb_context, "")
        negotiate_details = kerberos.authGSSClientResponse(krb_context)
        headers = {"Authorization": "Negotiate " + negotiate_details,
                    "Content-Type":"application/binary"}
        return headers

def kerberos_hdfs_upload(status,name,headers):
        if status =="active":
                data=open('%s'%local_file_path, 'rb').read()
                write_req = requests.put("%s/webhdfs/v1%s?op=CREATE&overwrite=true"%(name,remote_file_path),
                                         headers=headers,
                                         verify=False, 
                                         allow_redirects=True,
                                         data=data)
                print(write_req.text)

def kerberos_hdfs_read(status,name,headers):
        print(status)
        if status == "active":
                read = requests.get("%s/webhdfs/v1%s?op=OPEN"%(name,remote_file_path),
                                                        headers=headers,
                                    verify=False,
                                    allow_redirects=True)

                if read.status_code == 200:
                        data=open('%s'%local_file_path, 'wb')
                        data.write(read.content)
                        data.close()
                else : 
                        print(read.content)

status, name= check_node_status(node)
headers = kerberos_auth()
if read_or_write == "write":
        kerberos_hdfs_upload(status,name,headers)
elif read_or_write == "read":
        print("fun")
        kerberos_hdfs_read(status,name,headers)

当我在本地机器上运行它时,它在第7行给出了错误( node = os.getenv("namenode").split(",") )显然,因为没有 namenode 在我当地的系统里。那么,如何修改此代码以从hdfs进行读写呢?因为这是我接触hdfs的第二天,我完全不知道该怎么做。非常感谢您的帮助。
编辑:完成后 export namenode=hdfs_server ,namenode错误消失。再次运行脚本时,出现一个新错误:

Traceback (most recent call last):
  File "read_write_hdfs.py", line 9, in <module>
    local_file_path = sys.argv[1]
IndexError: list index out of range
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/apport_python_hook.py", line 63, in apport_excepthook
    from apport.fileutils import likely_packaged, get_recent_crashes
  File "/usr/lib/python3/dist-packages/apport/__init__.py", line 5, in <module>
    from apport.report import Report
  File "/usr/lib/python3/dist-packages/apport/report.py", line 30, in <module>
    import apport.fileutils
  File "/usr/lib/python3/dist-packages/apport/fileutils.py", line 23, in <module>
    from apport.packaging_impl import impl as packaging
  File "/usr/lib/python3/dist-packages/apport/packaging_impl.py", line 23, in <module>
    import apt
  File "/usr/lib/python3/dist-packages/apt/__init__.py", line 23, in <module>
    import apt_pkg
ModuleNotFoundError: No module named 'apt_pkg'

Original exception was:
Traceback (most recent call last):
  File "read_write_hdfs.py", line 9, in <module>
    local_file_path = sys.argv[1]
IndexError: list index out of range

所以我试着用3个参数运行这个脚本(每个参数满足1个) sys.argv[1] , sys.argv[2] 以及 sys.argv[3] ). 我现在得到以下错误:

$ python3 read_write_hdfs.py /home/embs/Desktop/hdfs_test/ /home/edhuser/testdata.txt read
['hdfs_server']
/home/embs/Desktop/hdfs_test/ /home/edhuser/testdata.txt
/home/embs/Desktop/hdfs_test/ /home/edhuser/testdata.txt
hdfs_server
Traceback (most recent call last):
  File "read_write_hdfs.py", line 64, in <module>
    status, name= check_node_status(node)
  File "read_write_hdfs.py", line 22, in check_node_status
    verify=False).json()
  File "/usr/lib/python3/dist-packages/requests/api.py", line 67, in get
    return request('get', url, params=params,**kwargs)
  File "/usr/lib/python3/dist-packages/requests/api.py", line 53, in request
    return session.request(method=method, url=url,**kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 468, in request
    resp = self.send(prep,**send_kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 570, in send
    adapter = self.get_adapter(url=request.url)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 644, in get_adapter
    raise InvalidSchema("No connection adapters were found for '%s'" % url)
requests.exceptions.InvalidSchema: No connection adapters were found for 'hdfs_server/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus'
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/apport_python_hook.py", line 63, in apport_excepthook
    from apport.fileutils import likely_packaged, get_recent_crashes
  File "/usr/lib/python3/dist-packages/apport/__init__.py", line 5, in <module>
    from apport.report import Report
  File "/usr/lib/python3/dist-packages/apport/report.py", line 30, in <module>
    import apport.fileutils
  File "/usr/lib/python3/dist-packages/apport/fileutils.py", line 23, in <module>
    from apport.packaging_impl import impl as packaging
  File "/usr/lib/python3/dist-packages/apport/packaging_impl.py", line 23, in <module>
    import apt
  File "/usr/lib/python3/dist-packages/apt/__init__.py", line 23, in <module>
    import apt_pkg
ModuleNotFoundError: No module named 'apt_pkg'

Original exception was:
Traceback (most recent call last):
  File "read_write_hdfs.py", line 66, in <module>
    status, name= check_node_status(node)
  File "read_write_hdfs.py", line 24, in check_node_status
    verify=False).json()
  File "/usr/lib/python3/dist-packages/requests/api.py", line 67, in get
    return request('get', url, params=params,**kwargs)
  File "/usr/lib/python3/dist-packages/requests/api.py", line 53, in request
    return session.request(method=method, url=url,**kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 468, in request
    resp = self.send(prep,**send_kwargs)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 570, in send
    adapter = self.get_adapter(url=request.url)
  File "/usr/lib/python3/dist-packages/requests/sessions.py", line 644, in get_adapter
    raise InvalidSchema("No connection adapters were found for '%s'" % url)
requests.exceptions.InvalidSchema: No connection adapters were found for 'hdfs_server/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus'

因为错误源于函数 check_node_status(node) ,我想它可能无法连接到 hdfs_server . 请问我该怎么修?

nqwrtyyt

nqwrtyyt1#

os.getenv("namenode") 正在查找环境变量 namenode .
所以导出env var并再次尝试运行脚本。

export namenode=hdfs_server

我假设hdfs\u server不是实际的服务器名。如果是您键入的实际命令,则不是主机名,而是ssh别名。你需要检查一下 ~/.ssh/config 实际主机名。

相关问题