我使用hadoopcli将二进制数据传输到hadoop集群上的python脚本。二进制数据有终止符,用于标识新文档的开始位置。记录按唯一标识符排序,该标识符从1000000001开始,增量为1。
我试图只保存字典中这些id的一个子集的数据。
我当前的过程是使用以下命令从cli中选择数据:
hadoop select "Database" "Collection" | cut -d$'\t' -f2 | python script.py
然后在script.py中处理它,如下所示:
import json
import sys
member_mapping = json.load(open('member_mapping.json'))
output = []
for line in sys.stdin:
person = json.loads(line)
if member_mapping.get(person['personId']):
output.append({person['personId']: person})
if len(output) == len(member_mapping):
break
问题是这个二进制数据中有6.5m个ID,扫描几乎需要2个小时。我知道字典中的min()和max()id,你可以在我的代码中看到,当我保存了n个文档时,我很早就停止了,其中n是Map文件的长度。
我想通过跳过尽可能多的读取来提高这个过程的效率。如果id从1000000001开始,我想保存的第一个id是100001000001,我可以跳过10000行吗?
由于系统问题,我目前无法使用spark或任何其他可能改进此过程的工具,因此我现在需要坚持使用python和hadoop cli的解决方案。
1条答案
按热度按时间xqk2d5yq1#
你可以试着用
enumerate
和一个阈值,然后跳过任何你不关心的输入。这不是一个直接的解决方案,但是应该运行得更快,并且很快就将前10000行代码扔掉。