我已经编写了一个kafka消费者,在其中我捕获了类变量中的一些度量,下面是消费者的代码。
class Consumer(Thread):
def __init__(self, kafka_topic, kafka_brokers, group_id):
Thread.__init__(self)
self.consumer = KafkaConsumer(kafka_topic, bootstrap_servers=kafka_brokers, group_id=group_id)
self.metrics = Metrics()
def process_message(self, message):
msg_data = json.loads(message.value)
meeting_id = str(uuid.uuid1())
metric_response = MetricResponse(meeting_id)
#perform some task and update metric response object
self.metrics.add_metric_response(metric_response)
print('\nFinished......')
def get_metrics(self):
return self.metrics
def run(self):
print('Consumer started...')
with ThreadPoolExecutor(max_workers=10) as executor:
for message in self.consumer:
executor.map(self.process_message, (message,))
现在我已经编写了一个flaskapi来检索这个metrics变量,在这里我也开始使用consumer,但是我得到的是空的metrics对象。下面是api代码
app = Flask(__name__)
consumer = Consumer('my-topic', ['localhost:9092'], 'my-group')
consumer.start()
@app.route('/metrics')
def get_metrics():
metrics = consumer.get_metrics()
metrics_response = metrics.toJSON()
return Response(metrics_response, 200, mimetype='application/json')
if __name__ == '__main__':
app.run(debug=True)
我需要帮助从consumer类访问metrics对象。我对Python不熟悉
暂无答案!
目前还没有任何答案,快来回答吧!