我有10 gb的输入文件,我正试图转换为avro使用python-hadoop流,工作是成功的,但我不能读取输出使用avro阅读器。
它给出的是“utf8”编解码器无法解码13924位的字节0xb4:无效的起始字节。
这里的问题是我在hadoop流中使用Map器输出的stdout,如果我在本地使用文件名和脚本,那么avro输出是可读的。
有什么主意,怎么解决?我认为问题在于如何处理流媒体中的键/值。。。。
hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar \
-input "xxx.txt" \
-mapper "/opt/anaconda/anaconda21/bin/python mapper.py x.avsc" \
-reducer NONE \
-output "xxxxx" -file "mapper.py" \
-lazyOutput \
-file "x.avsc"
Map器脚本是
import sys
import re
import os
from avro import schema, datafile
import avro.io as io
import StringIO
schema_str = open("xxxxx.avsc", 'r').read()
SCHEMA = schema.parse(schema_str)
rec_writer = io.DatumWriter(SCHEMA)
df_writer = datafile.DataFileWriter(sys.stdout, rec_writer, SCHEMA,)
header = []
for field in SCHEMA.fields:
header.append(field.name)
for line in sys.stdin:
fields = line.rstrip().split("\x01")
data = dict(zip(header, fields))
try:
df_writer.append(data)
except Exception, e:
print "failed with data: %s" % str(data)
print str(e)
df_writer.close()
1条答案
按热度按时间q9yhzks01#
终于可以解决这个问题了。使用output format类,并将avro二进制转换保留到此类。在streaming mapper中,只需发出json记录。
这里是mapper.py