我有一个巨大的gzip csv文件(55gb压缩,660gb扩展),我正试图以一种更有用的格式处理并加载到hive中。
该文件有2.5b个设备事件记录,每行给出一个设备标识、事件标识、时间戳和事件名称,然后还有大约160个列,其中只有少数列对于给定的事件名称是非空的。
最后,我想将数据格式化为几个标识列和一个json列,json列只存储给定事件名称的相关字段,并将其放入按日期、小时和事件名称(以及基于元数据的分区,如时区)划分的配置单元表中,但希望这是一个小问题),这样我们可以方便地查询数据进行进一步分析。然而,文件太大给我带来了麻烦。
我尝试过几种方法,但都没有成功:
将文件直接加载到配置单元中,然后执行“插入覆盖…”。。。使用“python transform.py”from…“选择transform(*)有明显的问题
我使用bash命令将文件拆分为多个gzip文件,每个文件有200万行:gzip-cd file.csv.gz | split-l 2000000-d-a 5--filter='gzip>split/$file.gz'并将其中一个加载到配置单元中进行转换,但我仍然遇到内存问题,尽管我们试图增加内存限制(我必须检查一下我们改变了哪些参数)。
答。尝试了一个使用pandas的转换脚本(因为它可以很容易地按事件名称分组,然后删除每个事件名称不需要的列),并将pandas限制为一次读取10万行,但仍然需要限制配置单元选择以避免内存问题(1000行工作正常,50万行没有)。
b。我还尝试创建一个辅助临时表(存储为orc),其列与csv文件中的列相同,但按事件名称进行分区,然后只将200万行选择到临时表中,但也有内存问题。
我考虑过从按事件名称拆分数据开始。我可以用awk来做这个,但是我不确定这会比仅仅做200万行文件好多少。
我希望有人对如何处理这个文件有一些建议。我对bash、python/pandas、hadoop/hive的任何组合都持开放态度(我也可以考虑其他组合)来完成这项工作,只要它大部分是自动的(我还将处理其他几个类似的文件)。
这是我正在使用的配置单元转换查询:
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions.pernode=10000;
SET mapreduce.map.memory.mb=8192;
SET mapreduce.reduce.memory.mb=10240;
ADD FILE transform.py;
INSERT OVERWRITE TABLE final_table
PARTITION (date,hour,zone,archetype,event_name)
SELECT TRANSFORM (*)
USING "python transform.py"
AS device_id,timestamp,json_blob,date,hour,zone,archetype,event_name
FROM (
SELECT *
FROM event_file_table eft
JOIN meta_table m ON eft.device_id=m.device_id
WHERE m.name = "device"
) t
;
这是python转换脚本:
import sys
import json
import pandas as pd
INSEP='\t'
HEADER=None
NULL='\\N'
OUTSEP='\t'
cols = [# 160+ columns here]
metaCols =['device_id','meta_blob','zone','archetype','name']
df = pd.read_csv(sys.stdin, sep=INSEP, header=HEADER,
names=cols+metaCols, na_values=NULL, chunksize=100000,
parse_dates=['timestamp'])
for chunk in df:
chunk = chunk.drop(['name','meta_blob'], axis=1)
chunk['date'] = chunk['timestamp'].dt.date
chunk['hour'] = chunk['timestamp'].dt.hour
for name, grp in chunk.groupby('event_name'):
grp = grp.dropna(axis=1, how='all') \
.set_index(['device_id','timestamp','date','hour',
'zone','archetype','event_name']) \
grp = pd.Series(grp.to_dict(orient='records'),grp.index) \
.to_frame().reset_index() \
.rename(columns={0:'json_blob'})
grp = grp[['device_id',timestamp','blob',
'date','hour','zone','archetype','event_name']]
grp.to_csv(sys.stdout, sep=OUTSEP, index=False, header=False)
根据Yarn的配置,以及什么首先达到极限,我收到了错误消息:
错误:java堆空间
超出gc开销限制
超过物理内存限制
更新,以防任何人有类似的问题。我最终成功地完成了这件事。在我们的例子中,似乎最有效的方法是使用awk按日期和时间分割文件。这使我们能够减少分区的内存开销,因为我们可以一次在几个小时内加载,而不是可能有数百个每小时一次的分区(乘以我们想要的所有其他分区)试图保留在内存中并一次加载。通过awk运行该文件需要几个小时,但是可以同时将另一个已经拆分的文件加载到配置单元中。
下面是我用来分割文件的bash/awk:
gzip -cd file.csv.gz |
tail -n +2 |
awk -F, -v MINDATE="$MINDATE" -v MAXDATE="$MAXDATE" \
'{
if ( ($5>=MINDATE) && ($5<MAXDATE) ) {
split($5, ts, "[: ]");
print | "gzip > file_"ts[1]"_"ts[2]".csv.gz"
}
}'
显然,文件的第5列是时间戳,mindate和maxdate用于过滤掉我们不关心的日期。这将时间戳按空格和冒号拆分,因此时间戳的第一部分是日期,第二部分是小时(第三和第四部分是分和秒),并使用它将行定向到相应的输出文件。
一旦文件按小时分割,我就将几个小时一次加载到一个临时配置单元表中,并继续进行上面提到的基本相同的转换。
暂无答案!
目前还没有任何答案,快来回答吧!