从flask端点访问示例变量中的kafka数据

7tofc5zh  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(281)

我已经编写了一个kafka消费者,在其中我捕获了类变量中的一些度量,下面是消费者的代码。

class Consumer(Thread):

    def __init__(self, kafka_topic, kafka_brokers, group_id):
        Thread.__init__(self)
        self.consumer = KafkaConsumer(kafka_topic, bootstrap_servers=kafka_brokers, group_id=group_id)
        self.metrics = Metrics()

    def process_message(self, message):
        msg_data = json.loads(message.value)
        meeting_id = str(uuid.uuid1())
        metric_response = MetricResponse(meeting_id)

        #perform some task and update metric response object

        self.metrics.add_metric_response(metric_response)
        print('\nFinished......')

    def get_metrics(self):
        return self.metrics

    def run(self):
        print('Consumer started...')
        with ThreadPoolExecutor(max_workers=10) as executor:
            for message in self.consumer:
                executor.map(self.process_message, (message,))

现在我已经编写了一个flaskapi来检索这个metrics变量,在这里我也开始使用consumer,但是我得到的是空的metrics对象。下面是api代码

app = Flask(__name__)
consumer = Consumer('my-topic', ['localhost:9092'], 'my-group')
consumer.start()

@app.route('/metrics')
def get_metrics():
    metrics = consumer.get_metrics()
    metrics_response = metrics.toJSON()
    return Response(metrics_response, 200, mimetype='application/json')

if __name__ == '__main__':
    app.run(debug=True)

我需要帮助从consumer类访问metrics对象。我对Python不熟悉

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题