import os
from subprocess import PIPE, Popen
def save_to_hdfs(output):
"""
Save a file in local scope to hdfs.
Note, this performs a forced put - any file with the same name will be
overwritten.
"""
hdfs_path = os.path.join(os.sep, 'user', '<your_name>', output)
put = Popen(["hadoop", "fs", "-put", "-f", output, hdfs_path], stdin=PIPE, bufsize=-1)
put.communicate()
# example
df = pd.DataFrame(...)
output_file = 'yourdata.csv'
dataframe.to_csv(output_file)
save_to_hdfs(output_file)
# remove locally created file (so it doesn't pollute nodes)
os.remove(output_file)
4条答案
按热度按时间bvpmtnay1#
这需要一些挖掘,但我能够找到一种方法,使用sqlalchemy直接从一个Dataframe创建一个配置单元表。
huus2vyu2#
要将数据写入配置单元的格式是什么?Parquet/avro/二进制还是简单的csv/文本格式?根据您在创建配置单元表时使用的serde的选择,可以使用不同的python库首先将Dataframe转换为相应的serde,在本地存储文件,然后可以使用类似save\ to\ hdfs的方法(如下面的@jared wilber所回答的)将该文件移动到hdfs配置单元表位置路径中。
创建配置单元表(默认或外部表)时,它从特定的hdfs位置(默认或提供的位置)读取/存储其数据。并且可以直接访问这个hdfs位置来修改数据。如果手动更新配置单元表中的数据,需要记住一些事情-serde、分区、行格式分隔等。
python中一些有用的serde库:
Parquet地板:https://fastparquet.readthedocs.io/en/latest/
avro:https网址:pypi.org/project/fastavro/
gdrx4gfi3#
你可以回信。将df的数据转换成这样的格式,就像您一次在表中插入多行一样。。
insert into table values (first row of dataframe comma separated ), (second row), (third row)
.... 等等;因此,您可以插入。你就完了。
2nbm6dog4#
你可以用这个
subprocess
模块。以下函数适用于您已在本地保存的数据。例如,如果将Dataframe保存到csv,则可以将csv的名称传递到
save_to_hdfs
,它将把它放入hdfs中。我确信有一种方法可以直接抛出Dataframe,但这应该可以让您开始。下面是一个保存本地对象的示例函数,
output
,至user/<your_name>/<output_name>
在hdfs中。