我们需要查询hadoop(hive)中的数据并将其保存到dbf文件中。为了实现这一点,我们使用spark作为我们的处理引擎,特别是pyspark(python 3.4).我们使用dbf包作为包dbf writer,https://pypi.org/project/dbf/经过几次测试,我们注意到这个过程花费了很多时间,有时达到20分钟。它不如我们写成另一种文件格式,如csv,兽人等等。
基本语法(约20分钟)
import dbf
from datetime import datetime
collections = spark.sql("SELECT JENISKEGIA, JUMLAHUM_A, ... , URUTAN, WEIGHT FROM silastik.sakernas_2022_8").collect()
filename2="/home/sak202208_"+str(datetime.now())+"_tes.dbf"
header2 = "JENISKEGIA N(8,0); JUMLAHUM_A N(8,0); ... , URUTAN N(7,0); WEIGHT N(8,0)"
new_table2 = dbf.Table(filename2, header2)
new_table2.open(dbf.READ_WRITE)
for row in collections:
new_table2.append(row)
new_table2.close
字符串
开启多线程(类似结果)
import dbf
from datetime import datetime
collections = spark.sql("SELECT JENISKEGIA, JUMLAHUM_A, ... , URUTAN, WEIGHT FROM silastik.sakernas_2022_8").collect()
filename2="/home/sak202208_"+str(datetime.now())+"_tes.dbf"
header2 = "JENISKEGIA N(8,0); JUMLAHUM_A N(8,0); ... , URUTAN N(7,0); WEIGHT N(8,0)"
new_table2 = dbf.Table(filename2, header2)
new_table2.open(dbf.READ_WRITE)
def append_row(table, record):
table.append(record)
with concurrent.futures.ThreadPoolExecutor(max_workers=min(32, (os.cpu_count() or 1) + 4)) as executor:
for row in collections:
executor.submit(append_row(new_table2, row))
new_table2.close
型
Spark驱动程序内存已被设置为7GB,但当我们使用top命令检查时,它只是在写入dbf文件时使用约1GB
我们如何有效地将数据写入dbf文件?是否有任何我们错过的调优或其他选择?
1条答案
按热度按时间zzlelutf1#
造成缓慢的主要原因有两个:
1.每个记录必须在Python数据类型和dbf存储数据类型之间转换;
一种加速是一次创建所有行(如果你知道有多少行的话),然后用实际数据替换每一行:
字符串
请注意,每个
row
必须是一个Map才能正常工作。