从外部服务器将pythonDataframe插入配置单元

cigdeys3  于 2021-06-27  发布在  Hive
关注(0)|答案(4)|浏览(431)

我目前正在使用pyhive(python3.6)将数据读取到hive集群之外的服务器,然后使用python执行分析。
在执行分析之后,我想将数据写回配置单元服务器。在寻找解决方案时,大多数帖子都涉及使用pyspark。从长远来看,我们将建立我们的系统来使用pyspark。但是,短期内有没有一种方法可以使用python从集群之外的服务器直接将数据写入配置单元表?
谢谢你的帮助!

bvpmtnay

bvpmtnay1#

这需要一些挖掘,但我能够找到一种方法,使用sqlalchemy直接从一个Dataframe创建一个配置单元表。

from sqlalchemy import create_engine

# Input Information

host = 'username@local-host'
port = 10000
schema = 'hive_schema'
table = 'new_table'

# Execution

engine = create_engine(f'hive://{host}:{port}/{schema}')
engine.execute('CREATE TABLE ' + table + ' (col1 col1-type, col2 col2-type)')
Data.to_sql(name=table, con=engine, if_exists='append')
huus2vyu

huus2vyu2#

要将数据写入配置单元的格式是什么?Parquet/avro/二进制还是简单的csv/文本格式?根据您在创建配置单元表时使用的serde的选择,可以使用不同的python库首先将Dataframe转换为相应的serde,在本地存储文件,然后可以使用类似save\ to\ hdfs的方法(如下面的@jared wilber所回答的)将该文件移动到hdfs配置单元表位置路径中。
创建配置单元表(默认或外部表)时,它从特定的hdfs位置(默认或提供的位置)读取/存储其数据。并且可以直接访问这个hdfs位置来修改数据。如果手动更新配置单元表中的数据,需要记住一些事情-serde、分区、行格式分隔等。
python中一些有用的serde库:
Parquet地板:https://fastparquet.readthedocs.io/en/latest/
avro:https网址:pypi.org/project/fastavro/

gdrx4gfi

gdrx4gfi3#

你可以回信。将df的数据转换成这样的格式,就像您一次在表中插入多行一样。。 insert into table values (first row of dataframe comma separated ), (second row), (third row) .... 等等;因此,您可以插入。

bundle=df.assign(col='('+df[df.col[0]] + ','+df[df.col[1]] +...+df[df.col[n]]+')'+',').col.str.cat(' ')[:-1]

con.cursor().execute('insert into table table_name values'+ bundle)

你就完了。

2nbm6dog

2nbm6dog4#

你可以用这个 subprocess 模块。
以下函数适用于您已在本地保存的数据。例如,如果将Dataframe保存到csv,则可以将csv的名称传递到 save_to_hdfs ,它将把它放入hdfs中。我确信有一种方法可以直接抛出Dataframe,但这应该可以让您开始。
下面是一个保存本地对象的示例函数, output ,至 user/<your_name>/<output_name> 在hdfs中。

import os
  from subprocess import PIPE, Popen

  def save_to_hdfs(output):
      """
      Save a file in local scope to hdfs.
      Note, this performs a forced put - any file with the same name will be 
      overwritten.
      """
      hdfs_path = os.path.join(os.sep, 'user', '<your_name>', output)
      put = Popen(["hadoop", "fs", "-put", "-f", output, hdfs_path], stdin=PIPE, bufsize=-1)
      put.communicate()

  # example
  df = pd.DataFrame(...)
  output_file = 'yourdata.csv'
  dataframe.to_csv(output_file)
  save_to_hdfs(output_file)
  # remove locally created file (so it doesn't pollute nodes)
  os.remove(output_file)

相关问题