microproduct/deformation-sentiral/ISCEApp/site-packages/pytest_benchmark/fixture.py

307 lines
11 KiB
Python
Raw Permalink Normal View History

2023-08-28 10:17:29 +00:00
from __future__ import division
from __future__ import print_function
import cProfile
import gc
import pstats
import sys
import time
import traceback
from math import ceil
from .compat import INT
from .compat import XRANGE
from .timers import compute_timer_precision
from .utils import NameWrapper
from .utils import format_time
try:
import statistics
except (ImportError, SyntaxError):
statistics_error = traceback.format_exc()
statistics = None
else:
statistics_error = None
from .stats import Metadata
class FixtureAlreadyUsed(Exception):
pass
class BenchmarkFixture(object):
_precisions = {}
def __init__(self, node, disable_gc, timer, min_rounds, min_time, max_time, warmup, warmup_iterations,
calibration_precision, add_stats, logger, warner, disabled, cprofile, group=None):
self.name = node.name
self.fullname = node._nodeid
self.disabled = disabled
if hasattr(node, 'callspec'):
self.param = node.callspec.id
self.params = node.callspec.params
else:
self.param = None
self.params = None
self.group = group
self.has_error = False
self.extra_info = {}
self.skipped = False
self._disable_gc = disable_gc
self._timer = timer.target
self._min_rounds = min_rounds
self._max_time = float(max_time)
self._min_time = float(min_time)
self._add_stats = add_stats
self._calibration_precision = calibration_precision
self._warmup = warmup and warmup_iterations
self._logger = logger
self._warner = warner
self._cleanup_callbacks = []
self._mode = None
self.cprofile = cprofile
self.cprofile_stats = None
self.stats = None
@property
def enabled(self):
return not self.disabled
def _get_precision(self, timer):
if timer in self._precisions:
timer_precision = self._precisions[timer]
else:
timer_precision = self._precisions[timer] = compute_timer_precision(timer)
self._logger.debug("")
self._logger.debug("Computing precision for %s ... %ss." % (
NameWrapper(timer), format_time(timer_precision)), blue=True, bold=True)
return timer_precision
def _make_runner(self, function_to_benchmark, args, kwargs):
def runner(loops_range, timer=self._timer):
gc_enabled = gc.isenabled()
if self._disable_gc:
gc.disable()
tracer = sys.gettrace()
sys.settrace(None)
try:
if loops_range:
start = timer()
for _ in loops_range:
function_to_benchmark(*args, **kwargs)
end = timer()
return end - start
else:
start = timer()
result = function_to_benchmark(*args, **kwargs)
end = timer()
return end - start, result
finally:
sys.settrace(tracer)
if gc_enabled:
gc.enable()
return runner
def _make_stats(self, iterations):
bench_stats = Metadata(self, iterations=iterations, options={
"disable_gc": self._disable_gc,
"timer": self._timer,
"min_rounds": self._min_rounds,
"max_time": self._max_time,
"min_time": self._min_time,
"warmup": self._warmup,
})
self._add_stats(bench_stats)
self.stats = bench_stats
return bench_stats
def __call__(self, function_to_benchmark, *args, **kwargs):
if self._mode:
self.has_error = True
raise FixtureAlreadyUsed(
"Fixture can only be used once. Previously it was used in %s mode." % self._mode)
try:
self._mode = 'benchmark(...)'
return self._raw(function_to_benchmark, *args, **kwargs)
except Exception:
self.has_error = True
raise
def pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
if self._mode:
self.has_error = True
raise FixtureAlreadyUsed(
"Fixture can only be used once. Previously it was used in %s mode." % self._mode)
try:
self._mode = 'benchmark.pedantic(...)'
return self._raw_pedantic(target, args=args, kwargs=kwargs, setup=setup, rounds=rounds,
warmup_rounds=warmup_rounds, iterations=iterations)
except Exception:
self.has_error = True
raise
def _raw(self, function_to_benchmark, *args, **kwargs):
if self.enabled:
runner = self._make_runner(function_to_benchmark, args, kwargs)
duration, iterations, loops_range = self._calibrate_timer(runner)
# Choose how many time we must repeat the test
rounds = int(ceil(self._max_time / duration))
rounds = max(rounds, self._min_rounds)
rounds = min(rounds, sys.maxsize)
stats = self._make_stats(iterations)
self._logger.debug(" Running %s rounds x %s iterations ..." % (rounds, iterations), yellow=True, bold=True)
run_start = time.time()
if self._warmup:
warmup_rounds = min(rounds, max(1, int(self._warmup / iterations)))
self._logger.debug(" Warmup %s rounds x %s iterations ..." % (warmup_rounds, iterations))
for _ in XRANGE(warmup_rounds):
runner(loops_range)
for _ in XRANGE(rounds):
stats.update(runner(loops_range))
self._logger.debug(" Ran for %ss." % format_time(time.time() - run_start), yellow=True, bold=True)
if self.enabled and self.cprofile:
profile = cProfile.Profile()
function_result = profile.runcall(function_to_benchmark, *args, **kwargs)
self.stats.cprofile_stats = pstats.Stats(profile)
else:
function_result = function_to_benchmark(*args, **kwargs)
return function_result
def _raw_pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
if kwargs is None:
kwargs = {}
has_args = bool(args or kwargs)
if not isinstance(iterations, INT) or iterations < 1:
raise ValueError("Must have positive int for `iterations`.")
if not isinstance(rounds, INT) or rounds < 1:
raise ValueError("Must have positive int for `rounds`.")
if not isinstance(warmup_rounds, INT) or warmup_rounds < 0:
raise ValueError("Must have positive int for `warmup_rounds`.")
if iterations > 1 and setup:
raise ValueError("Can't use more than 1 `iterations` with a `setup` function.")
def make_arguments(args=args, kwargs=kwargs):
if setup:
maybe_args = setup()
if maybe_args:
if has_args:
raise TypeError("Can't use `args` or `kwargs` if `setup` returns the arguments.")
args, kwargs = maybe_args
return args, kwargs
if self.disabled:
args, kwargs = make_arguments()
return target(*args, **kwargs)
stats = self._make_stats(iterations)
loops_range = XRANGE(iterations) if iterations > 1 else None
for _ in XRANGE(warmup_rounds):
args, kwargs = make_arguments()
runner = self._make_runner(target, args, kwargs)
runner(loops_range)
for _ in XRANGE(rounds):
args, kwargs = make_arguments()
runner = self._make_runner(target, args, kwargs)
if loops_range:
duration = runner(loops_range)
else:
duration, result = runner(loops_range)
stats.update(duration)
if loops_range:
args, kwargs = make_arguments()
result = target(*args, **kwargs)
if self.cprofile:
profile = cProfile.Profile()
args, kwargs = make_arguments()
profile.runcall(target, *args, **kwargs)
self.stats.cprofile_stats = pstats.Stats(profile)
return result
def weave(self, target, **kwargs):
try:
import aspectlib
except ImportError as exc:
raise ImportError(exc.args, "Please install aspectlib or pytest-benchmark[aspect]")
def aspect(function):
def wrapper(*args, **kwargs):
return self(function, *args, **kwargs)
return wrapper
self._cleanup_callbacks.append(aspectlib.weave(target, aspect, **kwargs).rollback)
patch = weave
def _cleanup(self):
while self._cleanup_callbacks:
callback = self._cleanup_callbacks.pop()
callback()
if not self._mode and not self.skipped:
self._logger.warn("Benchmark fixture was not used at all in this test!",
warner=self._warner, suspend=True)
def _calibrate_timer(self, runner):
timer_precision = self._get_precision(self._timer)
min_time = max(self._min_time, timer_precision * self._calibration_precision)
min_time_estimate = min_time * 5 / self._calibration_precision
self._logger.debug("")
self._logger.debug(" Calibrating to target round %ss; will estimate when reaching %ss "
"(using: %s, precision: %ss)." % (
format_time(min_time),
format_time(min_time_estimate),
NameWrapper(self._timer),
format_time(timer_precision)
), yellow=True, bold=True)
loops = 1
while True:
loops_range = XRANGE(loops)
duration = runner(loops_range)
if self._warmup:
warmup_start = time.time()
warmup_iterations = 0
warmup_rounds = 0
while time.time() - warmup_start < self._max_time and warmup_iterations < self._warmup:
duration = min(duration, runner(loops_range))
warmup_rounds += 1
warmup_iterations += loops
self._logger.debug(" Warmup: %ss (%s x %s iterations)." % (
format_time(time.time() - warmup_start),
warmup_rounds, loops
))
self._logger.debug(" Measured %s iterations: %ss." % (loops, format_time(duration)), yellow=True)
if duration >= min_time:
break
if duration >= min_time_estimate:
# coarse estimation of the number of loops
loops = int(ceil(min_time * loops / duration))
self._logger.debug(" Estimating %s iterations." % loops, green=True)
if loops == 1:
# If we got a single loop then bail early - nothing to calibrate if the the
# test function is 100 times slower than the timer resolution.
loops_range = XRANGE(loops)
break
else:
loops *= 10
return duration, loops, loops_range