增加在Azure容器示例上运行的Paho-MQTT发布者(Locust负载测试)

tv6aics1  于 2022-12-24  发布在  其他
关注(0)|答案(1)|浏览(143)

我们正在尝试使用Azure容器示例和Python Paho-MQTT库运行分布式Locust MQTT测试。我们无法在每个工作线程上运行超过340个客户端。

OSError: [Errno 24] Too many open files.

该故障与以下问题有关:

使用Docker,可以使用--ulimit更改软限制和硬限制,但ACI不接受Docker参数。
我们更改了ACI入口点,以增加运行以下bash脚本时的打开文件软限制:

ulimit -Sn 10000

locust

We added to the locustfile.py:

resource.setrlimit(resource.RLIMIT_NOFILE, (200000, 200000))

我们还尝试使用以下命令:

sudo sysctl -w fs.file-max=500000

sysctl -p

但它返回权限被拒绝错误。
你知道吗?

vsaztqbk

vsaztqbk1#

这不是ACI的问题,而是如何构建Paho-MQTT客户端的问题。Paho使用select(..)方法,该方法限制我们打开超过1024个文件描述符。
每个MQTT客户端表示3个打开文件描述符:3 * 340 = 1020。超过340个客户端连接,我们达到了1024个打开文件描述符。
我们使用了一个从Paho客户端继承的MQTT用户。我们重写了以下方法来使用eventfd包。

import eventfd

    [...]
    def loop_start(self) -> Optional[int]:
        [...]
        # self._sockpairR, self._sockpairW = _socketpair_compat()
        self._wake_event = EventFD()
        [...]

    def _reset_sockets(self, sockpair_only: bool = False) -> None:
        [...]
        # if self._sockpairR:
        #   self._sockpairR.close()
        #   self._sockpairR = None
        # if self._sockpairW:
        #   self._sockpairW.close()
        #   self._sockpairW = None
        if self._wake_event:
            self._wake_event = None

    def _packet_queue(
        self,
        command: Literal[48],
        packet: bytearray,
        mid: int,
        qos: int,
        info: Optional[Any] = None,
    ) -> int:
        [...]
        # if self._sockpairW is not None:
        #   try:
        #     self._sockpairW.send(sockpair_data)
        #   except BlockingIOError:
        #     pass
        if self._wake_event is not None:
            with suppress(BlockingIOError):
                self._wake_event.set()
        [...]

    def _loop(self, timeout: float = 1.0) -> int:
        [...]
        # if self._sockpairR is None:
        #   rlist = [self._sock]
        # else:
        #   rlist = [self._sock, self._sockpairR]
        if self._wake_event is None:
            rlist = [self._sock]
        else:
            rlist = [self._sock, self._wake_event]
        [...]
        # if self._sockpairR and self._sockpairR in socklist[0]:
        #   socklist[1].insert(0, self._sock)
        #   try:
        #       self._sockpairR.recv(10000)
        #   except BlockingIOError:
        #       pass
        if self._wake_event and self._wake_event.is_set():
            self._wake_event.clear()
        [...]
    [...]

相关问题