如何使用python跳过二进制stdin的n行?

rdlzhqv9  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(430)

我使用hadoopcli将二进制数据传输到hadoop集群上的python脚本。二进制数据有终止符,用于标识新文档的开始位置。记录按唯一标识符排序,该标识符从1000000001开始,增量为1。
我试图只保存字典中这些id的一个子集的数据。
我当前的过程是使用以下命令从cli中选择数据:

hadoop select "Database" "Collection" | cut -d$'\t' -f2 | python script.py

然后在script.py中处理它,如下所示:

import json
import sys

member_mapping = json.load(open('member_mapping.json'))

output = []

for line in sys.stdin:
    person = json.loads(line)
    if member_mapping.get(person['personId']):
        output.append({person['personId']: person})
    if len(output) == len(member_mapping):
        break

问题是这个二进制数据中有6.5m个ID,扫描几乎需要2个小时。我知道字典中的min()和max()id,你可以在我的代码中看到,当我保存了n个文档时,我很早就停止了,其中n是Map文件的长度。
我想通过跳过尽可能多的读取来提高这个过程的效率。如果id从1000000001开始,我想保存的第一个id是100001000001,我可以跳过10000行吗?
由于系统问题,我目前无法使用spark或任何其他可能改进此过程的工具,因此我现在需要坚持使用python和hadoop cli的解决方案。

xqk2d5yq

xqk2d5yq1#

你可以试着用 enumerate 和一个阈值,然后跳过任何你不关心的输入。这不是一个直接的解决方案,但是应该运行得更快,并且很快就将前10000行代码扔掉。

for lineNum, line in enumerate(sys.stdin):
    if(lineNum < 10000):
         continue
    person = json.loads(line)
    if member_mapping.get(person['personId']):
        output.append({person['personId']: person})
    if len(output) == len(member_mapping):
        break

相关问题