在循环中读取文件使用更改的列名创建一个新的df在另一个dir的append模式中写入新df将这个新dir移动到read dir
cmd=['hdfs', 'dfs', '-ls', OutDir]
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for i in process.communicate():
if i:
for j in i.decode('utf-8').strip().split():
if j.endswith('snappy.parquet'):
print('reading file ',j)
mydf = spark.read.format("parquet").option("inferSchema","true")\
.option("header", "true")\
.load(j)
print('df built on bad file ')
mydf.createOrReplaceTempView("dtl_rev")
ssql="""select old-name AS new_name,
old_col AS new_col from dtl_rev"""
newdf=spark.sql(ssql)
print('df built on renamed file ')
aggdf.write.format("parquet").mode("append").save(newdir)
3条答案
按热度按时间qxsslcnc1#
我们不能在现有文件中重命名列名,parquet将schema存储在数据文件中,我们可以使用下面的命令parquet tools schema part-m-00000.parquet检查schema
我们必须将备份放到临时表中,并重新接收历史数据。
vngu2lb82#
尝试使用,改变表格
7eumitmz3#
在循环中读取文件使用更改的列名创建一个新的df在另一个dir的append模式中写入新df将这个新dir移动到read dir