如何从python日志模块写入kafka?

anhgbhbe  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(358)

我有一个大型复杂的应用程序,它大量使用python日志模块。
我需要开始将这些日志放到kafka集群中,并且需要确保不会在过程中更改数据。
对我来说,理想的解决方案是只为kafka创建一个新的处理程序,并允许日志同时进入旧的日志解决方案和kafka一段时间。然后最终关闭旧的日志处理程序,直接发送给Kafka。
但是,我没有看到任何kafka日志处理程序-只有kafka客户端。添加一个kafka客户机意味着跟踪当前的每个日志调用,并向新的kafka客户机添加一个单独的调用。很难得到相同的结果。

wljmcqd8

wljmcqd81#

处理程序实现非常简单。实际上,设置环境比实现处理程序花费更多的时间。
处理程序构造函数接受可选参数 key . 如果提供了,则写入的消息将被发送到由该键指定的单个分区。如果没有提供,消息将在服务器之间以循环方式分发。
我没有做过太多的测试,但它很简单,我看不出这里会出什么问题。希望它有用。

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer,KeyedProducer
import logging,sys

class KafkaLoggingHandler(logging.Handler):

    def __init__(self, host, port, topic, key=None):
        logging.Handler.__init__(self)
        self.kafka_client = KafkaClient(host, port)
        self.key = key
        if key is None:
            self.producer = SimpleProducer(self.kafka_client, topic)
        else:
            self.producer = KeyedProducer(self.kafka_client, topic)

    def emit(self, record):
        #drop kafka logging to avoid infinite recursion
        if record.name == 'kafka':
            return
        try:
            #use default formatting
            msg = self.format(record)
            #produce message
            if self.key is None:
                self.producer.send_messages(msg)
            else:
                self.producer.send(self.key, msg)
        except:
            import traceback
            ei = sys.exc_info()
            traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr)
            del ei

    def close(self):
        self.producer.stop()
        logging.Handler.close(self)

kh = KafkaLoggingHandler("localhost", 9092, "test_log")

# OR

# kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1")

logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
logger.addHandler(kh)
logger.info("The %s boxing wizards jump %s", 5, "quickly")
logger.debug("The quick brown %s jumps over the lazy %s", "fox",  "dog")
try:
    import math
    math.exp(1000)
except:
    logger.exception("Problem with %s", "math.exp")

p、 处理程序使用这个kafka客户端:https://github.com/mumrah/kafka-python

s8vozzvw

s8vozzvw2#

这是一个很棒的修复,谢谢!这段代码在过去几年中已经进行了一些更新,有些函数现在已经被弃用了。这个补丁的总体设计非常非常有用,不过再次感谢您。

SimpleProducer (deprecated) --> KafkaProducer
SimpleConsumer (deprecated) --> KafkaConsumer

这是我修改后的代码片段,使用kafka1.0.0和kafkapython1.4.2,只是生产者,因为我通过另一端的logstash消费。
希望这对你有用!

tester.py(主例程)


# -*- coding: utf-8 -*-

"""Module to test out logging to kafka."""

import json
import logging

from utils.kafka_handler import KafkaHandler
from kafka import KafkaProducer

def run_it(logger=None):
    """Run the actual connections."""

    logger = logging.getLogger(__name__)
    # enable the debug logger if you want to see ALL of the lines
    #logging.basicConfig(level=logging.DEBUG)
    logger.setLevel(logging.DEBUG)

    kh = KafkaHandler(['localhost:9092'], 'sebtest')
    logger.addHandler(kh)

    logger.info("I'm a little logger, short and stout")
    logger.debug("Don't tase me bro!")

if __name__ == "__main__":
    run_it()

utils/kafka\u handler.py(日志实用程序)


# -*- coding: utf-8 -*-

"""Module to provide kafka handlers for internal logging facility."""

import json
import logging
import sys

from kafka import KafkaProducer

class KafkaHandler(logging.Handler):
    """Class to instantiate the kafka logging facility."""

    def __init__(self, hostlist, topic='corp_it_testing', tls=None):
        """Initialize an instance of the kafka handler."""
        logging.Handler.__init__(self)
        self.producer = KafkaProducer(bootstrap_servers=hostlist,
                                      value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                      linger_ms=10)
        self.topic = topic

    def emit(self, record):
        """Emit the provided record to the kafka_client producer."""
        # drop kafka logging to avoid infinite recursion
        if 'kafka.' in record.name:
            return

        try:
            # apply the logger formatter
            msg = self.format(record)
            self.producer.send(self.topic, {'message': msg})
            self.flush(timeout=1.0)
        except Exception:
            logging.Handler.handleError(self, record)

    def flush(self, timeout=None):
        """Flush the objects."""
        self.producer.flush(timeout=timeout)

    def close(self):
        """Close the producer and clean up."""
        self.acquire()
        try:
            if self.producer:
                self.producer.close()

            logging.Handler.close(self)
        finally:
            self.release()

相关问题