mrjob:设置登录emr

zdwk9cvp  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(333)

我正在尝试使用mrjob在emr上运行hadoop,但不知道如何设置日志记录(map/reduce步骤中用户生成的日志),因此在集群终止后可以访问它们。
我尝试使用 logging 模块, print 以及 sys.stderr.write() 但到目前为止运气不好。对我来说,唯一可行的方法是将日志写入一个文件,然后用ssh连接机器并读取它,但这很麻烦。我希望我的日志转到stderr/stdout/syslog并自动收集到s3,以便在集群终止后查看它们。
下面是带有日志记录的单词\u freq示例:

"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re
import logging
import logging.handlers
import sys

WORD_RE = re.compile(r"[\w']+")

class MRWordFreqCount(MRJob):

    def mapper_init(self):
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)
        self.logger.addHandler(logging.FileHandler("/tmp/mr.log"))
        self.logger.addHandler(logging.StreamHandler())
        self.logger.addHandler(logging.StreamHandler(sys.stdout))
        self.logger.addHandler(logging.handlers.SysLogHandler())

    def mapper(self, _, line):
        self.logger.info("Test logging: %s", line)
        sys.stderr.write("Test stderr: %s\n" % line)
        print "Test print: %s" % line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))

if __name__ == '__main__':
    MRWordFreqCount.run()
vfh0ocws

vfh0ocws1#

在所有选项中,唯一真正有效的是使用stderr和直接写入( sys.stderr.write )或者使用带有streamhandler的记录器来stderr。
作业完成后(成功或出现错误),可以从以下位置检索日志:
[s3\u log\u uri]/[jobflow id]/任务尝试次数/[job id]/[attempt id]/stderr
一定要把日志放在你的房间里 runners.emr.cleanup 配置。

jtw3ybtb

jtw3ybtb2#

这里有一个exmaple可以登录stdout(python3)

from mrjob.job import MRJob
from mrjob.job import MRStep
from mrjob.util import log_to_stream, log_to_null
import re
import sys
import logging

log = logging.getLogger(__name__)

WORD_RE = re.compile(r'[\w]+')

class MostUsedWords(MRJob):

    def set_up_logging(cls, quiet=False, verbose=False, stream=None):  
        log_to_stream(name='mrjob', debug=verbose, stream=stream)
        log_to_stream(name='__main__', debug=verbose, stream=stream)

    def steps(self):
        return [
            MRStep (mapper = self.mapper_get_words,
                    combiner = self.combiner_get_words,
                    reducer = self.reduce_get_words),
            MRStep (reducer = self.reducer_find_max)
        ]
        pass
    def mapper_get_words(self,  _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_get_words(self, word, counts):
        yield (word, sum(counts))

    def reduce_get_words(self, word, counts):
        log.info(word + "\t" +str(list(counts)) )
        yield None, (sum(counts), word)

    def reducer_find_max(self, key, value):
        # value is pairs i.e., tuples
        yield max(value)

if __name__ == '__main__':
    MostUsedWords.run()

相关问题