我正在尝试运行一个向Rabbit mq注入消息的负载测试,但是我可以获取Locust用户类来加载我创建的自定义客户端。有人做到了吗?
import os
from locust import HttpUser, task, TaskSet, run_single_user
from kombu import Connection, Exchange, Queue
class KombuClient:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.connection = Connection(
hostname=os.getenv(
"LOCUST_AMQP_CONFIG", "amqp://guest:guest@localhost:5672//"
),
)
self.channel = self.connection.channel()
def send_message(self, message):
exchange = Exchange(
name=os.getenv("LOCUST_AMQP_EXCHANGE", "test_exchange"),
type="direct",
)
queue = Queue(
name=os.getenv("LOCUST_AMQP_QUEUE", "test_queue"),
exchange=exchange,
routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
)
queue.maybe_bind(self.connection)
queue.declare()
producer = self.connection.Producer(
exchange=exchange,
routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
)
producer.publish(message)
print("INFO: Sent message: '{}'".format(message))
def close_connection(self):
self.connection.release()
class UserBehavior(TaskSet):
@task
def send_hello_message(self):
I make it work by, instead of the line below but that is not 100% correct--> KombuClient().send_message("Hello World!")
self.client.send_message("Hello World!")
class RabbitMQUser(HttpUser):
host = "http://localhost"
tasks = [UserBehavior]
min_wait = 5000
max_wait = 9000
client = KombuClient
if __name__ == "__main__":
run_single_user(RabbitMQUser)
一般来说,我可以注入消息,但Locust不能识别事务,也没有显示任何正在运行的用户。
我试图检查Locust代码以查看客户端期望的类是什么,但我没有发现任何有用的信息。
我猜API改变了,因为我使用Locust2.15.1,不再有HttpClient类
2条答案
按热度按时间xxhby3vn1#
这是更新后的代码,我仍然没有添加@Solowalker建议的更改,但如果你把所有这些放在一起,你就可以得到完整的答案。我会尝试更新代码,一旦一切都是工作。
nx7onnlm2#
这个问题在Locust常见问题解答中有介绍,示例可以在in the docs中找到。您需要做的是触发包含所有请求信息和统计信息的
request
事件。您使用的是HttpUser
,但覆盖了client
,所以您将它变成了一个普通的User
,它不知道您在做什么,您需要告诉它记录什么以及何时记录。示例取自gRPC场景的文档:
用你自己的统计数据来打电话。请参阅链接文档页面以了解参数的含义,但由于您正在做一个自定义用户,您可以使它们成为并意味着任何您想要的。