如何用python在hadoop中保存文件

nzrxty8p  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(502)

问题:
我开始学习hadoop,但是,我需要使用python将很多文件保存到hadoop中。我似乎不知道我做错了什么。有人能帮我吗?
下面是我的代码。我认为 HDFS_PATH 是正确的,因为我在安装时没有在设置中更改它。这个 pythonfile.txt 在我的桌面上(通过命令行运行的python代码也是如此)。
代码:

import hadoopy
import os
hdfs_path ='hdfs://localhost:9000/python' 

def main():
    hadoopy.writetb(hdfs_path, [('pythonfile.txt',open('pythonfile.txt').read())])

main()

当我运行上面的代码时,我得到的只是python本身的一个目录。

iMac-van-Brian:desktop Brian$ $HADOOP_HOME/bin/hadoop dfs -ls /python

DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

14/10/28 11:30:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r--   1 Brian supergroup        236 2014-10-28 11:30 /python
vtwuwzda

vtwuwzda1#

我感觉您正在写入一个名为“/python”的文件,而您希望它是存储该文件的目录
是什么

hdfs dfs -cat /python

给你看?
如果它显示文件内容,则只需编辑hdfs\u路径以包含文件名(应首先使用-rm删除/python),否则,请使用pydoop(pip install pydoop)并执行以下操作:

import pydoop.hdfs as hdfs

from_path = '/tmp/infile.txt'
to_path ='hdfs://localhost:9000/python/outfile.txt'
hdfs.put(from_path, to_path)
tag5nh1u

tag5nh1u2#

我在这里找到了答案:

import subprocess

def run_cmd(args_list):
        """
        run linux commands
        """
        # import subprocess
        print('Running system command: {0}'.format(' '.join(args_list)))
        proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        s_output, s_err = proc.communicate()
        s_return =  proc.returncode
        return s_return, s_output, s_err 

# Run Hadoop ls command in Python

(ret, out, err)= run_cmd(['hdfs', 'dfs', '-ls', 'hdfs_file_path'])
lines = out.split('\n')

# Run Hadoop get command in Python

(ret, out, err)= run_cmd(['hdfs', 'dfs', '-get', 'hdfs_file_path', 'local_path'])

# Run Hadoop put command in Python

(ret, out, err)= run_cmd(['hdfs', 'dfs', '-put', 'local_file', 'hdfs_file_path'])

# Run Hadoop copyFromLocal command in Python

(ret, out, err)= run_cmd(['hdfs', 'dfs', '-copyFromLocal', 'local_file', 'hdfs_file_path'])

# Run Hadoop copyToLocal command in Python

(ret, out, err)= run_cmd(['hdfs', 'dfs', '-copyToLocal', 'hdfs_file_path', 'local_file'])

hdfs dfs -rm -skipTrash /path/to/file/you/want/to/remove/permanently

# Run Hadoop remove file command in Python

(ret, out, err)= run_cmd(['hdfs', 'dfs', '-rm', 'hdfs_file_path'])
(ret, out, err)= run_cmd(['hdfs', 'dfs', '-rm', '-skipTrash', 'hdfs_file_path'])

# rm -r

# HDFS Command to remove the entire directory and all of its content from #HDFS.

# Usage: hdfs dfs -rm -r <path>

(ret, out, err)= run_cmd(['hdfs', 'dfs', '-rm', '-r', 'hdfs_file_path'])
(ret, out, err)= run_cmd(['hdfs', 'dfs', '-rm', '-r', '-skipTrash', 'hdfs_file_path'])

# Check if a file exist in HDFS

# Usage: hadoop fs -test -[defsz] URI

# Options:

# -d: f the path is a directory, return 0.

# -e: if the path exists, return 0.

# -f: if the path is a file, return 0.

# -s: if the path is not empty, return 0.

# -z: if the file is zero length, return 0.

# Example:

# hadoop fs -test -e filename

hdfs_file_path = '/tmpo'
cmd = ['hdfs', 'dfs', '-test', '-e', hdfs_file_path]
ret, out, err = run_cmd(cmd)
print(ret, out, err)
if ret:
    print('file does not exist')
2ledvvac

2ledvvac3#

这是一个非常典型的任务 subprocess 模块。解决方案如下所示:

put = Popen(["hadoop", "fs", "-put", <path/to/file>, <path/to/hdfs/file], stdin=PIPE, bufsize=-1)
put.communicate()

完整示例
假设您在服务器上,并且与hdfs有一个经过验证的连接(例如,您已经调用了 .keytab ).
您刚刚从一个 pandas.DataFrame 想把它放到hdfs里。
然后可以按如下方式将文件上载到hdfs:

import os 

import pandas as pd

from subprocess import PIPE, Popen

# define path to saved file

file_name = "saved_file.csv"

# create a pandas.DataFrame

sales = {'account': ['Jones LLC', 'Alpha Co', 'Blue Inc'], 'Jan': [150, 200, 50]}
df = pd.DataFrame.from_dict(sales)

# save your pandas.DataFrame to csv (this could be anything, not necessarily a pandas.DataFrame)

df.to_csv(file_name)

# create path to your username on hdfs

hdfs_path = os.path.join(os.sep, 'user', '<your-user-name>', file_name)

# put csv into hdfs

put = Popen(["hadoop", "fs", "-put", file_name, hdfs_path], stdin=PIPE, bufsize=-1)
put.communicate()

csv文件将存在于 /user/<your-user-name/saved_file.csv .
注意-如果您从一个在hadoop中调用的python脚本创建了这个文件,那么中间csv文件可能存储在一些随机节点上。因为这个文件(大概)不再需要了,所以最好删除它,以免每次调用脚本时污染节点。您只需添加 os.remove(file_name) 作为上面脚本的最后一行来解决这个问题。

相关问题