使用pyspark将数据从hadoop文件写入dbf非常耗时

vhmi4jdf  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(152)

我们需要查询hadoop(hive)中的数据并将其保存到dbf文件中。为了实现这一点,我们使用spark作为我们的处理引擎,特别是pyspark(python 3.4).我们使用dbf包作为包dbf writer,https://pypi.org/project/dbf/经过几次测试,我们注意到这个过程花费了很多时间,有时达到20分钟。它不如我们写成另一种文件格式,如csv,兽人等等。

基本语法(约20分钟)

import dbf
from datetime import datetime

collections = spark.sql("SELECT JENISKEGIA, JUMLAHUM_A, ... , URUTAN, WEIGHT FROM silastik.sakernas_2022_8").collect()

filename2="/home/sak202208_"+str(datetime.now())+"_tes.dbf"

header2 = "JENISKEGIA N(8,0); JUMLAHUM_A N(8,0); ... , URUTAN N(7,0); WEIGHT N(8,0)"

new_table2 = dbf.Table(filename2, header2)

new_table2.open(dbf.READ_WRITE)

for row in collections:
    new_table2.append(row)

new_table2.close

字符串

开启多线程(类似结果)

import dbf
from datetime import datetime

collections = spark.sql("SELECT JENISKEGIA, JUMLAHUM_A, ... , URUTAN, WEIGHT FROM silastik.sakernas_2022_8").collect()

filename2="/home/sak202208_"+str(datetime.now())+"_tes.dbf"

header2 = "JENISKEGIA N(8,0); JUMLAHUM_A N(8,0); ... , URUTAN N(7,0); WEIGHT N(8,0)"

new_table2 = dbf.Table(filename2, header2)

new_table2.open(dbf.READ_WRITE)

def append_row(table, record):
    table.append(record)

with concurrent.futures.ThreadPoolExecutor(max_workers=min(32, (os.cpu_count() or 1) + 4)) as executor:
    for row in collections:
        executor.submit(append_row(new_table2, row))

new_table2.close


Spark驱动程序内存已被设置为7GB,但当我们使用top命令检查时,它只是在写入dbf文件时使用约1GB
我们如何有效地将数据写入dbf文件?是否有任何我们错过的调优或其他选择?

zzlelutf

zzlelutf1#

造成缓慢的主要原因有两个:
1.每个记录必须在Python数据类型和dbf存储数据类型之间转换;

  1. dbf文件必须随每个记录(写入新行、写入有关dbf的元数据等)进行调整。
    一种加速是一次创建所有行(如果你知道有多少行的话),然后用实际数据替换每一行:
new_table2.open(dbf.READ_WRITE)

new_table2.append(multiple=<number_of_rows>)

for rec, row in zip(new_table2, collections):
    dbf.write(rec, **row)

字符串
请注意,每个row必须是一个Map才能正常工作。

相关问题