我正在尝试使用python驱动程序来运行一个迭代mrjob程序。退出标准取决于计数器。
作业本身似乎正在运行。如果我从命令行运行一个迭代,那么 hadoop fs -cat /user/myname/myhdfsdir/part-00000
并查看单个迭代的预期结果。
但是,我需要使用python驱动程序来运行代码并从 runner
. 这是因为它是一个迭代算法,需要计数器的值来确定退出标准。
OUTPUT_PATH = /user/myname/myhdfsdir
!hadoop fs -rm -r {OUTPUT_PATH}
from my_custom_MRjob import my_custom_MRjob
mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt",
"-r", "hadoop",
"--output-dir=hdfs://"+OUTPUT_PATH,
"--no-output"])
while True:
with mr_job.make_runner() as runner:
print runner.get_opts()
runner.run()
with open('localDir/localTextFile.txt', 'w') as f:
for line in runner.stream_output():
key,value = mr_job.parse_output_line(line)
#
f.write(key +'\t'+ value +'\n')
print "End of MRjob iteration. Counters: {}".format(runner.counters())
# read a particular counter
# use counter value to evaluate exit criteria
if exit_criteria_met:
break
这将产生以下错误:
IOErrorTraceback (most recent call last)
<ipython-input-136-aded8ecaa727> in <module>()
25 runner.run()
26 with open('localDir/localTextFile.txt', 'w') as f:
---> 27 for line in runner.stream_output():
28 key,value = mr_job.parse_output_line(line)
29 #
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/util.pyc in _to_lines(chunks)
391 leftovers = []
392
--> 393 for chunk in chunks:
394 # special case for b'' standing for EOF
395 if chunk == b'':
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/runner.pyc in cat_output(self)
555 yield b'' # EOF of previous file
556
--> 557 for chunk in self.fs._cat_file(filename):
558 yield chunk
559
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/composite.pyc in _cat_file(self, path)
75
76 def _cat_file(self, path):
---> 77 for line in self._do_action('_cat_file', path):
78 yield line
79
/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/hadoop.pyc in _cat_file(self, filename)
272
273 if returncode != 0:
--> 274 raise IOError("Could not stream %s" % filename)
275
276 def mkdir(self, path):
IOError: Could not stream hdfs://hdfs:/user/myname/myhdfsdir/part-00000
尤其令人困惑和沮丧的是: hdfs://hdfs:/user/myname/myhdfsdir/part-00000
. 注意有两个 hdfs
但在hdfs的第二个示例中只有一个正斜杠。我尝试添加和删除文字 hdfs://
在mrjob参数中: "--output-dir=hdfs://"+OUTPUT_PATH
. 在这两种情况下我得到相同的错误签名。
如果我以“local”模式而不是hadoop运行驱动程序,我就没有问题了,一个明显而关键的例外是我没有访问hadoop引擎的权限。这很好:
mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt"])
我需要读入初始输入文件,总是从本地文件系统(甚至在hadoop模式下)。然后运行mrjob迭代,其输出覆盖本地文件系统输入文件。然后从运行程序访问计数器并评估退出条件。如果不满足退出条件,请使用本地文件系统的输入再次运行作业,这次使用上次运行时更新的本地输入文件。
1条答案
按热度按时间oxalkeyp1#
只要你有一条包含
hdfs:/
你不会成功的,因为那永远不会有效。在你提到的你试图添加的评论中
hdfs://
手动,这可能是一个不错的黑客,但在你的代码中,我看不到你'清理'错误hdfs:/
. 因此,即使添加了正确的前缀,下一行的内容也将是错误的,代码仍然没有成功的机会。所以,请清理一下。
实用说明:这个问题是很久以前的问题,如果软件本身有一个问题现在可能已经解决了。如果问题仍然存在,则可能是您尝试使用的代码中存在一些奇怪的内容。也许可以从一个可靠来源的小例子开始排除这种可能性。