如何从Python Spark脚本进行日志记录

nimxete2  于 2023-01-04  发布在  Python
关注(0)|答案(8)|浏览(118)

我用spark-submit运行了一个Python Spark程序,我想在里面放一些日志记录语句。

logging.info("This is an informative message.")
logging.debug("This is a debug message.")

我想使用Spark正在使用的日志记录器,这样日志消息就可以以相同的格式输出,并且级别由相同的配置文件控制。
我试过将logging语句放入代码中,并从logging.getLogger()开始。在这两种情况下,我都看到了Spark的日志消息,但没有我的。我一直在查看Python logging documentation,但还没有能够从那里弄清楚它。
不确定这是特定于提交给Spark的脚本的东西,还是只是我不了解日志记录是如何工作的。

sf6xfgos

sf6xfgos1#

您可以从SparkContext对象获取记录器:

log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
uqxowvwt

uqxowvwt2#

你需要得到spark本身的日志记录器,默认情况下getLogger()会返回你自己模块的日志记录器。

logger = logging.getLogger('py4j')
logger.info("My test info statement")

它也可能是'pyspark'而不是'py4j'
如果你在spark程序中使用的函数(它做一些日志记录)和主函数定义在同一个模块中,那么它会产生一些序列化错误。
这在here中进行了说明,并且给出了同一人的示例here
我还在spark 1.3.1上测试了这个
编辑:
要将日志记录从STDERR更改为STDOUT,您必须删除当前的StreamHandler并添加一个新的。
查找现有的流处理程序(完成后可以删除此行)

print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]

可能只会有一个单一的,但如果没有,你将不得不更新的立场。

logger.removeHandler(logger.handlers[0])

sys.stdout添加新处理程序

import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
wgeznvg7

wgeznvg73#

我们需要从执行器记录日志,而不是从驱动程序节点记录日志。
1.我们在所有节点上创建了一个/etc/rsyslog.d/spark.conf(使用Bootstrap方法,通过Amazon Elastic Map Reduce将so that the Core nodes forwarded syslog本地1 '消息发送到主节点)。
1.在主节点上,我们启用了UDP和TCP syslog侦听器,并对其进行了设置,以便所有local消息都记录到/var/log/local1.log
1.我们在map函数中创建了一个Python logging模块Syslog日志记录器。
1.现在我们可以使用logging.info()进行日志记录....
我们发现的一件事是,同一个分区同时在多个执行器上处理,显然Spark在有额外资源的时候一直都是这样做的,它可以处理执行器神秘延迟或失败的情况。
登录map函数教会了我们很多关于Spark如何工作的知识。

aemubtdh

aemubtdh4#

在我的例子中,我只是很高兴将我的日志消息与通常的spark日志消息沿着添加到worker stderr中。
如果这符合您的需要,那么技巧是将特定的Python日志记录器重定向到stderr
例如,受this answer启发,下面的代码对我来说很好用:

def getlogger(name, level=logging.INFO):
    import logging
    import sys

    logger = logging.getLogger(name)
    logger.setLevel(level)
    if logger.handlers:
        # or else, as I found out, we keep adding handlers and duplicate messages
        pass
    else:
        ch = logging.StreamHandler(sys.stderr)
        ch.setLevel(level)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        ch.setFormatter(formatter)
        logger.addHandler(ch)
    return logger

用法:

def tst_log():
    logger = getlogger('my-worker')
    logger.debug('a')
    logger.info('b')
    logger.warning('c')
    logger.error('d')
    logger.critical('e')
    ...

输出(加上几行内容):

17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
yc0p9oo0

yc0p9oo05#

import logging

# Logger

logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s')
logger = logging.getLogger('driver_logger')
logger.setLevel(logging.DEBUG)

最简单的方法从pyspark登录!

kjthegm6

kjthegm66#

pyspark和java log4j交互的关键是jvm,下面是python代码,conf缺少url,但这是关于日志的。

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

my_jars = os.environ.get("SPARK_HOME")
myconf = SparkConf()
myconf.setMaster("local").setAppName("DB2_Test")
myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars)
spark = SparkSession\
 .builder\
 .appName("DB2_Test")\
 .config(conf = myconf) \
 .getOrCreate()

Logger= spark._jvm.org.apache.log4j.Logger
mylogger = Logger.getLogger(__name__)
mylogger.error("some error trace")
mylogger.info("some info trace")
snvhrwxg

snvhrwxg7#

可以在Spark下的类中实现logging.Handler接口,将日志消息转发给log4j,然后使用logging.root.addHandler()(可选地,logging.root.removeHandler())安装该处理程序。
处理程序应该具有如下所示的方法:

def emit(self, record):
    """Forward a log message for log4j."""
    Logger = self.spark_session._jvm.org.apache.log4j.Logger
    logger = Logger.getLogger(record.name)
    if record.levelno >= logging.CRITICAL:
        # Fatal and critical seem about the same.
        logger.fatal(record.getMessage())
    elif record.levelno >= logging.ERROR:
        logger.error(record.getMessage())
    elif record.levelno >= logging.WARNING:
        logger.warn(record.getMessage())
    elif record.levelno >= logging.INFO:
        logger.info(record.getMessage())
    elif record.levelno >= logging.DEBUG:
        logger.debug(record.getMessage())
    else:
        pass

初始化Spark会话后,应立即安装处理程序:

spark = SparkSession.builder.appName("Logging Example").getOrCreate()
handler = CustomHandler(spark_session)
# Replace the default handlers with the log4j forwarder.
root_handlers = logging.root.handlers[:]
for h in self.root_handlers:
    logging.root.removeHandler(h)
logging.root.addHandler(handler)

# Now you can log stuff.
logging.debug("Installed log4j log handler.")

下面是一个更完整的例子:https://gist.github.com/thsutton/65f0ec3cf132495ef91dc22b9bc38aec

xzabzqsa

xzabzqsa8#

您需要使spark日志对于驱动程序和所有执行器都是可访问的,因此我们创建了日志记录类,并将其作为作业依赖项进行处理,然后将其加载到每个执行器上。

class Log4j:
  def __init__(this, spark_session):
    conf = spark_session.SparkContext.getConf()
    app_id = conf.get('spark.app.id')
    app_name = conf.get('spark.app.name')
    log4jlogger = spark_session._jvm.org.apache.log4j
    prefix_msg = '<'+app_id + ' : ' + app_name +'> '
    print(prefix_msg)
    self.logger = log4jlogger.logManager.getLogger(prefix_msg)
  def warn(this, msg):
    # log warning 
    self.logger.warn(msg)
  def error(this, msg):
    #log error
    self.logger.error(msg)
  def info(this, msg):
    # log information message
    self.logger.info(msg)

相关问题