django 与-parallel一起使用时,无法覆盖unittest `startTest`,并且`stopTest`在自定义测试运行器中无法正常工作

0ve6wy6x  于 2023-06-25  发布在  Go
关注(0)|答案(2)|浏览(85)

bounty明天到期。此问题的答案有资格获得+50声望奖励。Abhinav Prakash正在寻找一个答案从一个有信誉的来源

我已经覆盖了unittest TextTestResult类中的startTest和stopTest,并在我的自定义测试运行器中使用了它。它在正常情况下正常工作,但在使用--parallel标志时无法正常工作。
我试着调试,发现b/w startTest和stopTest所用的时间非常非常小。例如4.8000000000492093e-05,这是不正确的。
有没有人能告诉我这是正确的钩子--平行旗还是我必须用另一个钩子?
复制步骤:
假设django的项目名是project,应用程序是app
1.在你的django项目目录中创建custom_test_runner.py,并添加以下代码:

from time import perf_counter
from unittest import TextTestResult, TextTestRunner
from django.test.runner import DiscoverRunner

class CustomTestRunnerTextTestResult(TextTestResult):
    def __init__(self, stream, descriptions, verbosity):
        super(CustomTestRunnerTextTestResult, self).__init__(stream, descriptions, verbosity)

    def startTest(self, test):
        self.start_time = perf_counter()
        super(CustomTestRunnerTextTestResult, self).startTest(test)

    def stopTest(self, test):
        super(CustomTestRunnerTextTestResult, self).stopTest(test)
        print(f"Time elapsed {str(test)} for {perf_counter() - self.start_time}")

class CustomTestRunnerTextTestRunner(TextTestRunner):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    resultclass = CustomTestRunnerTextTestResult

class CustomTestRunnerTestRunner(DiscoverRunner):
    def __init__(self, **kwargs):
        super(CustomTestRunnerTestRunner, self).__init__(**kwargs)
    test_runner = CustomTestRunnerTextTestRunner

1.在tests文件夹中创建test_custom_test_runner.py并添加以下代码:

import time
from django.test import TestCase

class Test1(TestCase):
    def test_1(self):
        time.sleep(2)

class Test2(TestCase):
    def test_2(self):
        time.sleep(2)

1.在您settings.py中添加以下代码

TEST_RUNNER = "project.custom_test_runner.CustomTestRunnerTestRunner"

1.现在,运行python manage.py test app.tests.test_custom_test_runner --parallel
你会得到这样一个很小的时间间隔:

.Time elapsed test_1 (app.tests.test_custom_test_runner.Test1) for 0.00034683300000004635
.Time elapsed test_2 (app.tests.test_custom_test_runner.Test2) for 0.0002126670000004438

----------------------------------------------------------------------
Ran 2 tests in 2.095s

但是如果你不使用--parallel flag,你会得到:

.Time elapsed test_1 (app.tests.test_custom_test_runner.Test1) for 2.005363042
.Time elapsed test_2 (app.tests.test_custom_test_runner.Test2) for 2.0043795000000006

----------------------------------------------------------------------
Ran 2 tests in 4.027s

Django版本:3.2.14

hsgswve4

hsgswve41#

看看“Django Writing and running tests¶”和它的unittest框架,它的主要目标通常是确定测试用例是通过还是失败。然而,测量每个测试所花费的时间似乎并不是默认Django测试运行器的内置功能。默认的Django测试运行器不提供直接的钩子来测量单个测试的执行时间。
当测试并行运行时,它们在单独的进程中执行。startTeststopTest方法是在每个进程的上下文中调用的,因此这些进程之间的时序信息并不同步。
一种可能的方法(在并行运行测试时精确测量每个测试所花费的时间)是使用共享变量或文件来存储跨进程的计时信息。这里:一个多处理Manager,用于在进程间共享字典。字典用于在custom_test_runner.py文件中存储每个测试的开始时间,以测试名称为键:

import time
import unittest
from multiprocessing import Manager, Lock
from django.test.runner import DiscoverRunner
from unittest.runner import TextTestResult

manager = Manager()

# Shared memory resources
shared_data = manager.Namespace()

lock = Lock()

class CustomTestRunnerTextTestResult(TextTestResult):
    def __init__(self, stream, descriptions, verbosity, temp_file):
        super(CustomTestRunnerTextTestResult, self).__init__(stream, descriptions, verbosity)
        self.temp_file = temp_file

    def startTest(self, test):
        with lock:
            shared_data.start_time = time.time()
        super(CustomTestRunnerTextTestResult, self).startTest(test)

    def stopTest(self, test):
        super(CustomTestRunnerTextTestResult, self).stopTest(test)
        with lock:
            elapsed_time = time.time() - shared_data.start_time
            # You can now use the elapsed_time, and if needed, save it using your custom service
            CustomTestRunnerService.save_duration(test=test, elapsed_time=elapsed_time, temp_file=self.temp_file)

class CustomTestRunner(DiscoverRunner):
    def get_resultclass(self):
        return CustomTestRunnerTextTestResult

多处理器Manager用于创建共享字典。当测试开始时,它的开始时间被记录在共享字典中。当测试停止时,将使用共享字典中的开始时间和当前时间计算时间。
锁(lock)用于确保一次只有一个进程可以访问此共享数据。startTeststopTest方法与锁同步,以确保持续更新和访问开始时间和运行时间。
这种方法应该在并行测试环境中更准确地工作,因为它使用了可以跨进程访问的共享数据结构。
添加到您的settings.py

# settings.py

TEST_RUNNER = 'path_to_your_module.custom_test_runner.TimeLoggingTestRunner'

并与manage.py并行启动测试:

./manage.py test --parallel
wvt8vs2t

wvt8vs2t2#

如果使用--parallel标志,则不会实时调用test_runner.resultclassCustomTestRunnerTextTestResult)方法。
而是调用parallel_test_suite.runnerclass.resultclassRemoteTestResult)方法,这些方法将events(对test_runner.resultclass的方法调用)排队,以便在每次测试完成后由parallel_test_suiteParallelTestSuite)一起调度。
Django门票,2020年10月:https://code.djangoproject.com/ticket/32140(关闭为needsinfo)
以下是支持您需求的一种方法:
1.不直接调用perf_counter(),而是实现并调用self._get_time(),它将尝试返回由RemoteTestResult子类标记的时间。
1.为RemoteTestResult子类提供一个方法,通过events标记时间。

class CustomTestRunnerTextTestResult(TextTestResult):
    def __init__(self, stream, descriptions, verbosity):
        super(CustomTestRunnerTextTestResult, self).__init__(stream, descriptions, verbosity)
        self._marked_remote_time = None  # Add this

    def mark_remote_time(self, _test, remote_time):  # For CustomTestRunnerRemoteTestResult events  # Add this method
        self._marked_remote_time = remote_time

    def _get_time(self):  # Add this method
        return self._marked_remote_time or perf_counter()

    def startTest(self, test):
        # self.start_time = perf_counter()  # Change this
        self.start_time = self._get_time()  # to this
        super(CustomTestRunnerTextTestResult, self).startTest(test)

    def stopTest(self, test):
        super(CustomTestRunnerTextTestResult, self).stopTest(test)
        # print(f"Time elapsed {str(test)} for {perf_counter() - self.start_time}")  # Change this
        print(f"Time elapsed {str(test)} for {self._get_time() - self.start_time}")  # to this

1.实现RemoteTestResult子类,通过事件标记时间。

from django.test.runner import RemoteTestResult

class CustomTestRunnerRemoteTestResult(RemoteTestResult):
    def startTest(self, test):
        self.events.append(("mark_remote_time", self.test_index, perf_counter()))
        super().startTest(test)  # Does self.events.append(("startTest", self.test_index))

    def stopTest(self, test):
        self.events.append(("mark_remote_time", self.test_index, perf_counter()))
        super().stopTest(test)  # Does self.events.append(("stopTest", self.test_index))

1.将RemoteTestResult子类传递给parallel_test_suite.runnerclass构造函数。

from functools import partial

class CustomTestRunnerTestRunner(DiscoverRunner):
    def __init__(self, **kwargs):
        super(CustomTestRunnerTestRunner, self).__init__(**kwargs)
        self.parallel_test_suite.runner_class = partial(  # Add this
            self.parallel_test_suite.runner_class,
            resultclass=CustomTestRunnerRemoteTestResult,
        )
    test_runner = CustomTestRunnerTextTestRunner

相关问题