rabbitmq 我无法将Rabbit MQ客户端附加到Locust用户

1yjd4xko  于 2023-06-23  发布在  RabbitMQ
关注(0)|答案(2)|浏览(140)

我正在尝试运行一个向Rabbit mq注入消息的负载测试,但是我可以获取Locust用户类来加载我创建的自定义客户端。有人做到了吗?

import os
from locust import HttpUser, task, TaskSet, run_single_user
from kombu import Connection, Exchange, Queue

class KombuClient:
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.connection = Connection(
            hostname=os.getenv(
                "LOCUST_AMQP_CONFIG", "amqp://guest:guest@localhost:5672//"
            ),
        )
        self.channel = self.connection.channel()

    def send_message(self, message):
        exchange = Exchange(
            name=os.getenv("LOCUST_AMQP_EXCHANGE", "test_exchange"),
            type="direct",
        )
        queue = Queue(
            name=os.getenv("LOCUST_AMQP_QUEUE", "test_queue"),
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        queue.maybe_bind(self.connection)
        queue.declare()

        producer = self.connection.Producer(
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        producer.publish(message)
        print("INFO: Sent message: '{}'".format(message))

    def close_connection(self):
        self.connection.release()

class UserBehavior(TaskSet):
    @task
    def send_hello_message(self):
        I make it work by, instead of the line below but that is not 100% correct--> KombuClient().send_message("Hello World!")
        self.client.send_message("Hello World!")

class RabbitMQUser(HttpUser):
    host = "http://localhost"
    tasks = [UserBehavior]
    min_wait = 5000
    max_wait = 9000
    client = KombuClient
  

if __name__ == "__main__":
    run_single_user(RabbitMQUser)

一般来说,我可以注入消息,但Locust不能识别事务,也没有显示任何正在运行的用户。
我试图检查Locust代码以查看客户端期望的类是什么,但我没有发现任何有用的信息。
我猜API改变了,因为我使用Locust2.15.1,不再有HttpClient类

xxhby3vn

xxhby3vn1#

这是更新后的代码,我仍然没有添加@Solowalker建议的更改,但如果你把所有这些放在一起,你就可以得到完整的答案。我会尝试更新代码,一旦一切都是工作。

import os
from locust import task, TaskSet, User, run_single_user
from locust.clients import HttpSession
from kombu import Connection, Exchange, Queue

class KombuClient(HttpSession):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.connection = Connection(
            hostname=os.getenv(
                "LOCUST_AMQP_CONFIG", "amqp://guest:guest@localhost:5672//"
            ),
        )
        self.channel = self.connection.channel()

    def send_message(self, message):
        exchange = Exchange(
            name=os.getenv("LOCUST_AMQP_EXCHANGE", "test_exchange"),
            type="direct",
        )
        queue = Queue(
            name=os.getenv("LOCUST_AMQP_QUEUE", "test_queue"),
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        queue.maybe_bind(self.connection)
        queue.declare()

        producer = self.connection.Producer(
            exchange=exchange,
            routing_key=os.getenv("LOCUST_AMQP_KEY", "test_key"),
        )
        producer.publish(message)
        print("INFO: Sent message: '{}'".format(message))

    def close_connection(self):
        self.connection.release()

class UserBehavior(TaskSet):
    @task
    def send_hello_message(self):
        self.client.send_message(message="Hello World!")

class RabbitMQUser(User):
    host = "http://localhost"
    tasks = [UserBehavior]
    min_wait = 5000
    max_wait = 9000
    abstract = True

    def __init__(self, environment):
        super().__init__(environment)
        self.client = KombuClient(self.host, user=self, request_event=environment.events.request)

if __name__ == "__main__":
    run_single_user(RabbitMQUser)
nx7onnlm

nx7onnlm2#

这个问题在Locust常见问题解答中有介绍,示例可以在in the docs中找到。您需要做的是触发包含所有请求信息和统计信息的request事件。您使用的是HttpUser,但覆盖了client,所以您将它变成了一个普通的User,它不知道您在做什么,您需要告诉它记录什么以及何时记录。
示例取自gRPC场景的文档:

self.environment.events.request.fire(
        request_type="grpc",
        name=call_details.method,
        response_time=(time.perf_counter() - start_perf_counter) * 1000,
        response_length=response_length,
        response=response,
        context=None,
        exception=exception,
    )

用你自己的统计数据来打电话。请参阅链接文档页面以了解参数的含义,但由于您正在做一个自定义用户,您可以使它们成为并意味着任何您想要的。

相关问题