压缩hadoop序列文件python

nfg76nw0  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(420)

我有一个客户端发送给我snappy压缩hadoop序列文件进行分析。我最终想做的是把这些数据放到一个数据库中。格式如下所示

>>> body_read

b'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\xff\xff\xff\xff\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\x8e\x05^N\x00\x00\x05^\x00\x00\x00F\xde\n\x00\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00r\x01\x00\x04\x00\x00\x00\x00\x8e\x08\x92\x00\x00\x10\x1a\x00\x00\x08\x8a\x9a h\x8e\x02\xd6\x8e\x02\xc6\x8e\x02T\x8e\x02\xd4\x8e\x02\xdb\x8e\x02\xd8\x8e\x02\xdf\x8e\x02\xd9\x8e\x02\xd3\x05\x0c0\xd9\x8e\x02\xcc\x8e\x02\xfc\x8e\x02\xe8\x8e\x02\xd0\x05!\x00\xdb\x05\x06\x0c\xd1\x8e\x02\xd7\x05\'\x04\xde\x8e\x01\x03\x18\xce\x8e\x02\xe7\x8e\x02\xd2\x05<\x00\xd4\x05\x1b\x04\xdc\x8e

我想我需要做的是首先使用pythonsappy解压文件,然后读取序列文件。我不确定用python读取hadoop序列文件的最佳方法是什么。我也得到和错误时,试图解压缩这个文件

>>> body_decomp = snappy.uncompress(body_read)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ec2-user/anaconda3/lib/python3.5/site-packages/snappy/snappy.py", line 91, in uncompress
    return _uncompress(data)
snappy.UncompressError: Error while decompressing: invalid input

我需要做什么才能读取这些文件?

fd3cxomn

fd3cxomn1#

感谢@cricket\u007的有益评论和更多的挖掘,我能够解决这个问题。pyspark将完成我需要的任务,并且可以直接从s3位置读取hadoop序列文件,这很好。最棘手的部分是设置pyspark,下载了ApacheSpark之后,我发现这个指南非常有用-https://markobigdata.com/2017/04/23/manipulating-files-from-s3-with-apache-spark/.
不过,我发现一个奇怪的差异是,我的spark shell会自动解压缩文件:

scala> val fRDD = sc.textFile("s3a://bucket/file_path")
fRDD: org.apache.spark.rdd.RDD[String] = s3a://bucket/file_path MapPartitionsRDD[5] at textFile at <console>:24

scala> fRDD.first()
res4: String = SEQ?!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable??)org.apache.hadoop.io.compress.SnappyCodec???? �Z�f�uAf���- 2D���� �Z�f�uAf���- 2D�?^N???^???F�

但Pypark没有:

>>> from pyspark import SparkContext, SparkConf
>>> sc = SparkContext()
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/02/06 23:00:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

>>> fRDD = sc.textFile("s3a://bucket/file_path")
>>> fRDD.first()
'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b�Z�f�uAf���-\x1d2D����\x0b�Z�f�uAf���-\x1d2D�\x05^N\x00\x00\x05^\x00\x00\x00F�'

你知道我怎么让皮斯帕克这么做吗?
编辑:再次感谢cricket\u 007,我开始改用.sequencefile()。这是一开始给我的错误

>>> textFile = sc.sequenceFile("s3a://bucket/file_path")
18/02/07 18:13:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.

我按照这个指南解决了这个问题-https://community.hortonworks.com/questions/18903/this-version-of-libhadoop-was-built-without-snappy.html. 我现在可以读取序列文件并反编译protobuf消息

>>> seqs = sc.sequenceFile("s3a://bucket/file_path").values()
>>> feed = protobuf_message_pb2.feed()
>>> row = bytes(seqs.first())
>>> feed.ParseFromString(row)
>>> feed.user_id_64
3909139888943208259

这正是我需要的。我现在要做的是找到一种有效的方法来反编译整个sequencefile并将其转换为一个Dataframe,而不是像上面所做的那样一次只做一个记录。

相关问题