使用hadoop流式处理python读取/写入包含节俭记录的序列文件

dnph8jn4  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(348)

关闭。这个问题需要更加突出重点。它目前不接受答案。
**想改进这个问题吗?**通过编辑这篇文章更新这个问题,使它只关注一个问题。

6年前关门了。
改进这个问题
我想读/写序列文件包含节俭记录使用hadoop流与python。我已经看了下面的内容,看起来这在hadoop-1722之后是可能的,但是如果有人已经这样做了,并且可以举个例子,那就太好了。
http://mojodna.net/2013/12/27/binary-streaming-with-hadoop-and-nodejs.html
如何在hadoop流媒体中使用“typedbytes”或“rawbytes”?
http://static.last.fm/johan/huguk-20090414/klaas-hadoop-1722.pdf
https://issues.apache.org/jira/browse/hadoop-1722
关键是能够在python中从stdin读取节俭对象。

nbnkbykc

nbnkbykc1#

我终于用hadoopy完成了。
这是我简单的节俭目标。

struct Test {
    1: required string foo;
    2: required string bar;
}

我使用命令行工具生成python定义,并压缩目录。然后我生成了一些数据,并使用loadtb将其推送到hdfs中。
下面是反序列化数据并将其作为字符串写出的代码。

import hadoopy
from thrift.protocol import TBinaryProtocol
from thrift.TSerialization import deserialize
import sys

class Mapper(object):
    def __init__(self):
        sys.path.append('lib/test.zip/gen-py')
        from Test.ttypes import Test
        self.protocol_factory = TBinaryProtocol.TBinaryProtocolFactory
        self.test = Test()
    def map(self, key, value):
        deserialize(self.test, key, protocol_factory=self.protocol_factory())
        yield 1, self.test.foo + " "  + self.test.bar

if __name__ == '__main__':
    hadoopy.run(Mapper)

这是驱动程序脚本。

import hadoopy

    hadoopy.launch('/data/fuscala', '/data/fuscala-output', 'fuscala1.py', remove_output = True, files = ["test.zip"])
    data = hadoopy.readtb('/data/fuscala-output')
    for i, j in data:
        print i,j

相关问题