307 lines
11 KiB
Python
307 lines
11 KiB
Python
|
from __future__ import division
|
||
|
from __future__ import print_function
|
||
|
|
||
|
import cProfile
|
||
|
import gc
|
||
|
import pstats
|
||
|
import sys
|
||
|
import time
|
||
|
import traceback
|
||
|
from math import ceil
|
||
|
|
||
|
from .compat import INT
|
||
|
from .compat import XRANGE
|
||
|
from .timers import compute_timer_precision
|
||
|
from .utils import NameWrapper
|
||
|
from .utils import format_time
|
||
|
|
||
|
try:
|
||
|
import statistics
|
||
|
except (ImportError, SyntaxError):
|
||
|
statistics_error = traceback.format_exc()
|
||
|
statistics = None
|
||
|
else:
|
||
|
statistics_error = None
|
||
|
from .stats import Metadata
|
||
|
|
||
|
|
||
|
class FixtureAlreadyUsed(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class BenchmarkFixture(object):
|
||
|
_precisions = {}
|
||
|
|
||
|
def __init__(self, node, disable_gc, timer, min_rounds, min_time, max_time, warmup, warmup_iterations,
|
||
|
calibration_precision, add_stats, logger, warner, disabled, cprofile, group=None):
|
||
|
self.name = node.name
|
||
|
self.fullname = node._nodeid
|
||
|
self.disabled = disabled
|
||
|
if hasattr(node, 'callspec'):
|
||
|
self.param = node.callspec.id
|
||
|
self.params = node.callspec.params
|
||
|
else:
|
||
|
self.param = None
|
||
|
self.params = None
|
||
|
self.group = group
|
||
|
self.has_error = False
|
||
|
self.extra_info = {}
|
||
|
self.skipped = False
|
||
|
|
||
|
self._disable_gc = disable_gc
|
||
|
self._timer = timer.target
|
||
|
self._min_rounds = min_rounds
|
||
|
self._max_time = float(max_time)
|
||
|
self._min_time = float(min_time)
|
||
|
self._add_stats = add_stats
|
||
|
self._calibration_precision = calibration_precision
|
||
|
self._warmup = warmup and warmup_iterations
|
||
|
self._logger = logger
|
||
|
self._warner = warner
|
||
|
self._cleanup_callbacks = []
|
||
|
self._mode = None
|
||
|
self.cprofile = cprofile
|
||
|
self.cprofile_stats = None
|
||
|
self.stats = None
|
||
|
|
||
|
@property
|
||
|
def enabled(self):
|
||
|
return not self.disabled
|
||
|
|
||
|
def _get_precision(self, timer):
|
||
|
if timer in self._precisions:
|
||
|
timer_precision = self._precisions[timer]
|
||
|
else:
|
||
|
timer_precision = self._precisions[timer] = compute_timer_precision(timer)
|
||
|
self._logger.debug("")
|
||
|
self._logger.debug("Computing precision for %s ... %ss." % (
|
||
|
NameWrapper(timer), format_time(timer_precision)), blue=True, bold=True)
|
||
|
return timer_precision
|
||
|
|
||
|
def _make_runner(self, function_to_benchmark, args, kwargs):
|
||
|
def runner(loops_range, timer=self._timer):
|
||
|
gc_enabled = gc.isenabled()
|
||
|
if self._disable_gc:
|
||
|
gc.disable()
|
||
|
tracer = sys.gettrace()
|
||
|
sys.settrace(None)
|
||
|
try:
|
||
|
if loops_range:
|
||
|
start = timer()
|
||
|
for _ in loops_range:
|
||
|
function_to_benchmark(*args, **kwargs)
|
||
|
end = timer()
|
||
|
return end - start
|
||
|
else:
|
||
|
start = timer()
|
||
|
result = function_to_benchmark(*args, **kwargs)
|
||
|
end = timer()
|
||
|
return end - start, result
|
||
|
finally:
|
||
|
sys.settrace(tracer)
|
||
|
if gc_enabled:
|
||
|
gc.enable()
|
||
|
|
||
|
return runner
|
||
|
|
||
|
def _make_stats(self, iterations):
|
||
|
bench_stats = Metadata(self, iterations=iterations, options={
|
||
|
"disable_gc": self._disable_gc,
|
||
|
"timer": self._timer,
|
||
|
"min_rounds": self._min_rounds,
|
||
|
"max_time": self._max_time,
|
||
|
"min_time": self._min_time,
|
||
|
"warmup": self._warmup,
|
||
|
})
|
||
|
self._add_stats(bench_stats)
|
||
|
self.stats = bench_stats
|
||
|
return bench_stats
|
||
|
|
||
|
def __call__(self, function_to_benchmark, *args, **kwargs):
|
||
|
if self._mode:
|
||
|
self.has_error = True
|
||
|
raise FixtureAlreadyUsed(
|
||
|
"Fixture can only be used once. Previously it was used in %s mode." % self._mode)
|
||
|
try:
|
||
|
self._mode = 'benchmark(...)'
|
||
|
return self._raw(function_to_benchmark, *args, **kwargs)
|
||
|
except Exception:
|
||
|
self.has_error = True
|
||
|
raise
|
||
|
|
||
|
def pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
|
||
|
if self._mode:
|
||
|
self.has_error = True
|
||
|
raise FixtureAlreadyUsed(
|
||
|
"Fixture can only be used once. Previously it was used in %s mode." % self._mode)
|
||
|
try:
|
||
|
self._mode = 'benchmark.pedantic(...)'
|
||
|
return self._raw_pedantic(target, args=args, kwargs=kwargs, setup=setup, rounds=rounds,
|
||
|
warmup_rounds=warmup_rounds, iterations=iterations)
|
||
|
except Exception:
|
||
|
self.has_error = True
|
||
|
raise
|
||
|
|
||
|
def _raw(self, function_to_benchmark, *args, **kwargs):
|
||
|
if self.enabled:
|
||
|
runner = self._make_runner(function_to_benchmark, args, kwargs)
|
||
|
|
||
|
duration, iterations, loops_range = self._calibrate_timer(runner)
|
||
|
|
||
|
# Choose how many time we must repeat the test
|
||
|
rounds = int(ceil(self._max_time / duration))
|
||
|
rounds = max(rounds, self._min_rounds)
|
||
|
rounds = min(rounds, sys.maxsize)
|
||
|
|
||
|
stats = self._make_stats(iterations)
|
||
|
|
||
|
self._logger.debug(" Running %s rounds x %s iterations ..." % (rounds, iterations), yellow=True, bold=True)
|
||
|
run_start = time.time()
|
||
|
if self._warmup:
|
||
|
warmup_rounds = min(rounds, max(1, int(self._warmup / iterations)))
|
||
|
self._logger.debug(" Warmup %s rounds x %s iterations ..." % (warmup_rounds, iterations))
|
||
|
for _ in XRANGE(warmup_rounds):
|
||
|
runner(loops_range)
|
||
|
for _ in XRANGE(rounds):
|
||
|
stats.update(runner(loops_range))
|
||
|
self._logger.debug(" Ran for %ss." % format_time(time.time() - run_start), yellow=True, bold=True)
|
||
|
if self.enabled and self.cprofile:
|
||
|
profile = cProfile.Profile()
|
||
|
function_result = profile.runcall(function_to_benchmark, *args, **kwargs)
|
||
|
self.stats.cprofile_stats = pstats.Stats(profile)
|
||
|
else:
|
||
|
function_result = function_to_benchmark(*args, **kwargs)
|
||
|
return function_result
|
||
|
|
||
|
def _raw_pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
|
||
|
if kwargs is None:
|
||
|
kwargs = {}
|
||
|
|
||
|
has_args = bool(args or kwargs)
|
||
|
|
||
|
if not isinstance(iterations, INT) or iterations < 1:
|
||
|
raise ValueError("Must have positive int for `iterations`.")
|
||
|
|
||
|
if not isinstance(rounds, INT) or rounds < 1:
|
||
|
raise ValueError("Must have positive int for `rounds`.")
|
||
|
|
||
|
if not isinstance(warmup_rounds, INT) or warmup_rounds < 0:
|
||
|
raise ValueError("Must have positive int for `warmup_rounds`.")
|
||
|
|
||
|
if iterations > 1 and setup:
|
||
|
raise ValueError("Can't use more than 1 `iterations` with a `setup` function.")
|
||
|
|
||
|
def make_arguments(args=args, kwargs=kwargs):
|
||
|
if setup:
|
||
|
maybe_args = setup()
|
||
|
if maybe_args:
|
||
|
if has_args:
|
||
|
raise TypeError("Can't use `args` or `kwargs` if `setup` returns the arguments.")
|
||
|
args, kwargs = maybe_args
|
||
|
return args, kwargs
|
||
|
|
||
|
if self.disabled:
|
||
|
args, kwargs = make_arguments()
|
||
|
return target(*args, **kwargs)
|
||
|
|
||
|
stats = self._make_stats(iterations)
|
||
|
loops_range = XRANGE(iterations) if iterations > 1 else None
|
||
|
for _ in XRANGE(warmup_rounds):
|
||
|
args, kwargs = make_arguments()
|
||
|
|
||
|
runner = self._make_runner(target, args, kwargs)
|
||
|
runner(loops_range)
|
||
|
|
||
|
for _ in XRANGE(rounds):
|
||
|
args, kwargs = make_arguments()
|
||
|
|
||
|
runner = self._make_runner(target, args, kwargs)
|
||
|
if loops_range:
|
||
|
duration = runner(loops_range)
|
||
|
else:
|
||
|
duration, result = runner(loops_range)
|
||
|
stats.update(duration)
|
||
|
|
||
|
if loops_range:
|
||
|
args, kwargs = make_arguments()
|
||
|
result = target(*args, **kwargs)
|
||
|
|
||
|
if self.cprofile:
|
||
|
profile = cProfile.Profile()
|
||
|
args, kwargs = make_arguments()
|
||
|
profile.runcall(target, *args, **kwargs)
|
||
|
self.stats.cprofile_stats = pstats.Stats(profile)
|
||
|
|
||
|
return result
|
||
|
|
||
|
def weave(self, target, **kwargs):
|
||
|
try:
|
||
|
import aspectlib
|
||
|
except ImportError as exc:
|
||
|
raise ImportError(exc.args, "Please install aspectlib or pytest-benchmark[aspect]")
|
||
|
|
||
|
def aspect(function):
|
||
|
def wrapper(*args, **kwargs):
|
||
|
return self(function, *args, **kwargs)
|
||
|
|
||
|
return wrapper
|
||
|
|
||
|
self._cleanup_callbacks.append(aspectlib.weave(target, aspect, **kwargs).rollback)
|
||
|
|
||
|
patch = weave
|
||
|
|
||
|
def _cleanup(self):
|
||
|
while self._cleanup_callbacks:
|
||
|
callback = self._cleanup_callbacks.pop()
|
||
|
callback()
|
||
|
if not self._mode and not self.skipped:
|
||
|
self._logger.warn("Benchmark fixture was not used at all in this test!",
|
||
|
warner=self._warner, suspend=True)
|
||
|
|
||
|
def _calibrate_timer(self, runner):
|
||
|
timer_precision = self._get_precision(self._timer)
|
||
|
min_time = max(self._min_time, timer_precision * self._calibration_precision)
|
||
|
min_time_estimate = min_time * 5 / self._calibration_precision
|
||
|
self._logger.debug("")
|
||
|
self._logger.debug(" Calibrating to target round %ss; will estimate when reaching %ss "
|
||
|
"(using: %s, precision: %ss)." % (
|
||
|
format_time(min_time),
|
||
|
format_time(min_time_estimate),
|
||
|
NameWrapper(self._timer),
|
||
|
format_time(timer_precision)
|
||
|
), yellow=True, bold=True)
|
||
|
|
||
|
loops = 1
|
||
|
while True:
|
||
|
loops_range = XRANGE(loops)
|
||
|
duration = runner(loops_range)
|
||
|
if self._warmup:
|
||
|
warmup_start = time.time()
|
||
|
warmup_iterations = 0
|
||
|
warmup_rounds = 0
|
||
|
while time.time() - warmup_start < self._max_time and warmup_iterations < self._warmup:
|
||
|
duration = min(duration, runner(loops_range))
|
||
|
warmup_rounds += 1
|
||
|
warmup_iterations += loops
|
||
|
self._logger.debug(" Warmup: %ss (%s x %s iterations)." % (
|
||
|
format_time(time.time() - warmup_start),
|
||
|
warmup_rounds, loops
|
||
|
))
|
||
|
|
||
|
self._logger.debug(" Measured %s iterations: %ss." % (loops, format_time(duration)), yellow=True)
|
||
|
if duration >= min_time:
|
||
|
break
|
||
|
|
||
|
if duration >= min_time_estimate:
|
||
|
# coarse estimation of the number of loops
|
||
|
loops = int(ceil(min_time * loops / duration))
|
||
|
self._logger.debug(" Estimating %s iterations." % loops, green=True)
|
||
|
if loops == 1:
|
||
|
# If we got a single loop then bail early - nothing to calibrate if the the
|
||
|
# test function is 100 times slower than the timer resolution.
|
||
|
loops_range = XRANGE(loops)
|
||
|
break
|
||
|
else:
|
||
|
loops *= 10
|
||
|
return duration, loops, loops_range
|