""" from: https://bitbucket.org/haypo/misc/src/tip/python/pep418.py Implementation of the PEP 418 in pure Python using ctypes. Functions: - clock() - get_clock_info(name) - monotonic(): not always available - perf_frequency() - process_time() - sleep() - time() Constants: - has_monotonic (bool): True if time.monotonic() is available """ # flake8: noqa # TODO: gethrtime() for Solaris/OpenIndiana # TODO: call GetSystemTimeAdjustment() to get the resolution # TODO: other FIXME import os import sys import time as python_time has_mach_absolute_time = False has_clock_gettime = False has_gettimeofday = False has_ftime = False has_delay = False has_libc_time = False has_libc_clock = False has_libc_sleep = False has_GetTickCount64 = False CLOCK_REALTIME = None CLOCK_MONOTONIC = None CLOCK_PROCESS_CPUTIME_ID = None CLOCK_HIGHRES = None CLOCK_PROF = None try: import ctypes import ctypes.util from ctypes import byref, POINTER except ImportError as err: pass else: def ctypes_oserror(): errno = ctypes.get_errno() message = os.strerror(errno) return OSError(errno, message) time_t = ctypes.c_long if os.name == "nt": from ctypes.wintypes import BOOL, DWORD, HANDLE, FILETIME from ctypes import windll LARGEINTEGER = ctypes.c_int64 LARGEINTEGER_p = POINTER(LARGEINTEGER) FILETIME_p = POINTER(FILETIME) ULONGLONG = ctypes.c_uint64 def ctypes_winerror(): errno = ctypes.get_errno() message = os.strerror(errno) return WindowsError(errno, message) _QueryPerformanceFrequency = windll.kernel32.QueryPerformanceFrequency _QueryPerformanceFrequency.restype = BOOL _QueryPerformanceFrequency.argtypes = (LARGEINTEGER_p,) def QueryPerformanceFrequency(): frequency = LARGEINTEGER() ok = _QueryPerformanceFrequency(byref(frequency)) if not ok: raise ctypes_winerror() return int(frequency.value) _QueryPerformanceCounter = windll.kernel32.QueryPerformanceCounter _QueryPerformanceCounter.restype = BOOL _QueryPerformanceCounter.argtypes = (LARGEINTEGER_p,) def QueryPerformanceCounter(): frequency = LARGEINTEGER() ok = _QueryPerformanceCounter(byref(frequency)) if not ok: raise ctypes_winerror() return int(frequency.value) GetTickCount = windll.kernel32.GetTickCount GetTickCount.restype = DWORD GetTickCount.argtypes = () if hasattr(windll.kernel32, 'GetTickCount64'): GetTickCount64 = windll.kernel32.GetTickCount64 GetTickCount64.restype = ULONGLONG GetTickCount64.argtypes = () has_GetTickCount64 = True GetCurrentProcess = windll.kernel32.GetCurrentProcess GetCurrentProcess.argtypes = () GetCurrentProcess.restype = HANDLE _GetProcessTimes = windll.kernel32.GetProcessTimes _GetProcessTimes.argtypes = (HANDLE, FILETIME_p, FILETIME_p, FILETIME_p, FILETIME_p) _GetProcessTimes.restype = BOOL def filetime2py(obj): return (obj.dwHighDateTime << 32) + obj.dwLowDateTime def GetProcessTimes(handle): creation_time = FILETIME() exit_time = FILETIME() kernel_time = FILETIME() user_time = FILETIME() ok = _GetProcessTimes(handle, byref(creation_time), byref(exit_time), byref(kernel_time), byref(user_time)) if not ok: raise ctypes_winerror() return (filetime2py(creation_time), filetime2py(exit_time), filetime2py(kernel_time), filetime2py(user_time)) _GetSystemTimeAsFileTime = windll.kernel32.GetSystemTimeAsFileTime _GetSystemTimeAsFileTime.argtypes = (FILETIME_p,) _GetSystemTimeAsFileTime.restype = None def GetSystemTimeAsFileTime(): system_time = FILETIME() _GetSystemTimeAsFileTime(byref(system_time)) return filetime2py(system_time) libc_name = ctypes.util.find_library('c') if libc_name: libc = ctypes.CDLL(libc_name, use_errno=True) clock_t = ctypes.c_ulong if sys.platform == 'darwin': mach_absolute_time = libc.mach_absolute_time mach_absolute_time.argtypes = () mach_absolute_time.restype = ctypes.c_uint64 has_mach_absolute_time = True class mach_timebase_info_data_t(ctypes.Structure): _fields_ = ( ('numer', ctypes.c_uint32), ('denom', ctypes.c_uint32), ) mach_timebase_info_data_p = POINTER(mach_timebase_info_data_t) _mach_timebase_info = libc.mach_timebase_info _mach_timebase_info.argtypes = (mach_timebase_info_data_p,) _mach_timebase_info.restype = ctypes.c_int def mach_timebase_info(): timebase = mach_timebase_info_data_t() _mach_timebase_info(byref(timebase)) return (timebase.numer, timebase.denom) _libc_clock = libc.clock _libc_clock.argtypes = () _libc_clock.restype = clock_t has_libc_clock = True if hasattr(libc, 'sleep'): _libc_sleep = libc.sleep _libc_sleep.argtypes = (ctypes.c_uint,) _libc_sleep.restype = ctypes.c_uint has_libc_sleep = True if hasattr(libc, 'gettimeofday'): class timeval(ctypes.Structure): _fields_ = ( ('tv_sec', time_t), ('tv_usec', ctypes.c_long), ) timeval_p = POINTER(timeval) timezone_p = ctypes.c_void_p _gettimeofday = libc.gettimeofday # FIXME: some platforms only expect one argument _gettimeofday.argtypes = (timeval_p, timezone_p) _gettimeofday.restype = ctypes.c_int def gettimeofday(): tv = timeval() err = _gettimeofday(byref(tv), None) if err: raise ctypes_oserror() return tv has_gettimeofday = True time_t_p = POINTER(time_t) if hasattr(libc, 'time'): _libc__time = libc.time _libc__time.argtypes = (time_t_p,) _libc__time.restype = time_t def _libc_time(): return _libc__time(None) has_libc_time = True if sys.platform.startswith(("freebsd", "openbsd")): librt_name = libc_name else: librt_name = ctypes.util.find_library('rt') if librt_name: librt = ctypes.CDLL(librt_name, use_errno=True) if hasattr(librt, 'clock_gettime'): clockid_t = ctypes.c_int class timespec(ctypes.Structure): _fields_ = ( ('tv_sec', time_t), ('tv_nsec', ctypes.c_long), ) timespec_p = POINTER(timespec) _clock_gettime = librt.clock_gettime _clock_gettime.argtypes = (clockid_t, timespec_p) _clock_gettime.restype = ctypes.c_int def clock_gettime(clk_id): ts = timespec() err = _clock_gettime(clk_id, byref(ts)) if err: raise ctypes_oserror() return ts.tv_sec + ts.tv_nsec * 1e-9 has_clock_gettime = True _clock_settime = librt.clock_settime _clock_settime.argtypes = (clockid_t, timespec_p) _clock_settime.restype = ctypes.c_int def clock_settime(clk_id, value): ts = timespec() ts.tv_sec = int(value) ts.tv_nsec = int(float(abs(value)) % 1.0 * 1e9) err = _clock_settime(clk_id, byref(ts)) if err: raise ctypes_oserror() return ts.tv_sec + ts.tv_nsec * 1e-9 _clock_getres = librt.clock_getres _clock_getres.argtypes = (clockid_t, timespec_p) _clock_getres.restype = ctypes.c_int def clock_getres(clk_id): ts = timespec() err = _clock_getres(clk_id, byref(ts)) if err: raise ctypes_oserror() return ts.tv_sec + ts.tv_nsec * 1e-9 if sys.platform.startswith("linux"): CLOCK_REALTIME = 0 CLOCK_MONOTONIC = 1 CLOCK_PROCESS_CPUTIME_ID = 2 elif sys.platform.startswith("freebsd"): CLOCK_REALTIME = 0 CLOCK_PROF = 2 CLOCK_MONOTONIC = 4 elif sys.platform.startswith("openbsd"): CLOCK_REALTIME = 0 CLOCK_MONOTONIC = 3 elif sys.platform.startswith("sunos"): CLOCK_REALTIME = 3 CLOCK_HIGHRES = 4 # clock_gettime(CLOCK_PROCESS_CPUTIME_ID) fails with errno 22 # on OpenSolaris # CLOCK_PROCESS_CPUTIME_ID = 5 def _clock_gettime_info(use_info, clk_id): value = clock_gettime(clk_id) if use_info: name = { CLOCK_MONOTONIC: 'CLOCK_MONOTONIC', CLOCK_PROF: 'CLOCK_PROF', CLOCK_HIGHRES: 'CLOCK_HIGHRES', CLOCK_PROCESS_CPUTIME_ID: 'CLOCK_PROCESS_CPUTIME_ID', CLOCK_REALTIME: 'CLOCK_REALTIME', }[clk_id] try: resolution = clock_getres(clk_id) except OSError: resolution = 1e-9 info = { 'implementation': 'clock_gettime(%s)' % name, 'resolution': resolution, } if clk_id in (CLOCK_MONOTONIC, CLOCK_PROF, CLOCK_HIGHRES, CLOCK_PROCESS_CPUTIME_ID): info['monotonic'] = True info['adjustable'] = False elif clk_id in (CLOCK_REALTIME,): info['monotonic'] = False info['adjustable'] = True else: info = None return (value, info) has_monotonic = False if os.name == 'nt': # GetTickCount64() requires Windows Vista, Server 2008 or later if has_GetTickCount64: def _monotonic(use_info): value = GetTickCount64() * 1e-3 if use_info: info = { 'implementation': "GetTickCount64()", "monotonic": True, "resolution": 1e-3, "adjustable": False, } # FIXME: call GetSystemTimeAdjustment() to get the resolution else: info = None return (value, info) has_monotonic = True else: def _monotonic(use_info): ticks = GetTickCount() if ticks < _monotonic.last: # Integer overflow detected _monotonic.delta += 2**32 _monotonic.last = ticks value = (ticks + _monotonic.delta) * 1e-3 if use_info: info = { 'implementation': "GetTickCount()", "monotonic": True, "resolution": 1e-3, "adjustable": False, } # FIXME: call GetSystemTimeAdjustment() to get the resolution else: info = None return (value, info) _monotonic.last = 0 _monotonic.delta = 0 has_monotonic = True elif has_mach_absolute_time: def _monotonic(use_info): if _monotonic.factor is None: timebase = mach_timebase_info() _monotonic.factor = timebase[0] / timebase[1] * 1e-9 value = mach_absolute_time() * _monotonic.factor if use_info: info = { 'implementation': "mach_absolute_time()", 'resolution': _monotonic.factor, 'monotonic': True, 'adjustable': False, } else: info = None return (value, info) _monotonic.factor = None has_monotonic = True elif has_clock_gettime and CLOCK_HIGHRES is not None: def _monotonic(use_info): return _clock_gettime_info(use_info, CLOCK_HIGHRES) has_monotonic = True elif has_clock_gettime and CLOCK_MONOTONIC is not None: def _monotonic(use_info): return _clock_gettime_info(use_info, CLOCK_MONOTONIC) has_monotonic = True if has_monotonic: def monotonic(): return _monotonic(False)[0] def _perf_counter(use_info): info = None if _perf_counter.use_performance_counter: if _perf_counter.performance_frequency is None: value, info = _win_perf_counter(use_info) if value is not None: return (value, info) if _perf_counter.use_monotonic: # The monotonic clock is preferred over the system time try: return _monotonic(use_info) except (OSError, WindowsError): _perf_counter.use_monotonic = False return _time(use_info) _perf_counter.use_performance_counter = (os.name == 'nt') if _perf_counter.use_performance_counter: _perf_counter.performance_frequency = None _perf_counter.use_monotonic = has_monotonic def perf_counter(): return _perf_counter(False)[0] if os.name == 'nt': def _process_time(use_info): handle = GetCurrentProcess() process_times = GetProcessTimes(handle) value = (process_times[2] + process_times[3]) * 1e-7 if use_info: info = { "implementation": "GetProcessTimes()", "resolution": 1e-7, "monotonic": True, "adjustable": False, # FIXME: call GetSystemTimeAdjustment() to get the resolution } else: info = None return (value, info) else: import os try: import resource except ImportError: has_resource = False else: has_resource = True def _process_time(use_info): info = None if _process_time.clock_id is not None: try: return _clock_gettime_info(use_info, _process_time.clock_id) except OSError: _process_time.clock_id = None if _process_time.use_getrusage: try: usage = resource.getrusage(resource.RUSAGE_SELF) value = usage[0] + usage[1] except OSError: _process_time.use_getrusage = False else: if use_info: info = { "implementation": "getrusage(RUSAGE_SELF)", "resolution": 1e-6, "monotonic": True, "adjustable": False, } return (value, info) if _process_time.use_times: try: times = os.times() value = times[0] + times[1] except OSError: _process_time.use_getrusage = False else: if use_info: try: ticks_per_second = os.sysconf("SC_CLK_TCK") except ValueError: ticks_per_second = 60 # FIXME: get HZ constant info = { "implementation": "times()", "resolution": 1.0 / ticks_per_second, "monotonic": True, "adjustable": False, } return (value, info) return _libc_clock_info(use_info) if has_clock_gettime and CLOCK_PROCESS_CPUTIME_ID is not None: _process_time.clock_id = CLOCK_PROCESS_CPUTIME_ID elif has_clock_gettime and CLOCK_PROF is not None: _process_time.clock_id = CLOCK_PROF else: _process_time.clock_id = None _process_time.use_getrusage = has_resource # On OS/2, only the 5th field of os.times() is set, others are zeros _process_time.use_times = (hasattr(os, 'times') and os.name != 'os2') def process_time(): return _process_time(False)[0] if os.name == "nt": def _time(use_info): value = GetSystemTimeAsFileTime() * 1e-7 if use_info: info = { 'implementation': 'GetSystemTimeAsFileTime', 'resolution': 1e-7, 'monotonic': False, # FIXME: call GetSystemTimeAdjustment() to get the resolution # and adjustable } else: info = None return (value, info) else: def _time(use_info): info = None if has_clock_gettime and CLOCK_REALTIME is not None: try: return _clock_gettime_info(use_info, CLOCK_REALTIME) except OSError: # CLOCK_REALTIME is not supported (unlikely) pass if has_gettimeofday: try: tv = gettimeofday() except OSError: # gettimeofday() should not fail pass else: if use_info: info = { 'monotonic': False, "implementation": "gettimeofday()", "resolution": 1e-6, 'monotonic': False, 'adjustable': True, } value = tv.tv_sec + tv.tv_usec * 1e-6 return (value, info) # FIXME: implement ftime() if has_ftime: if use_info: info = { "implementation": "ftime()", "resolution": 1e-3, 'monotonic': False, 'adjustable': True, } value = ftime() elif has_libc_time: if use_info: info = { "implementation": "time()", "resolution": 1.0, 'monotonic': False, 'adjustable': True, } value = float(_libc_time()) else: if use_info: info = { "implementation": "time.time()", 'monotonic': False, 'adjustable': True, } if os.name == "nt": # On Windows, time.time() uses ftime() info["resolution"] = 1e-3 else: # guess that time.time() uses gettimeofday() info["resolution"] = 1e-6 value = python_time.time() return (value, info) def time(): return _time(False)[0] try: import select except ImportError: has_select = False else: # FIXME: On Windows, select.select([], [], [], seconds) fails with # select.error(10093) has_select = (hasattr(select, "select") and os.name != "nt") if has_select: def _sleep(seconds): return select.select([], [], [], seconds) elif has_delay: def _sleep(seconds): milliseconds = int(seconds * 1000) # FIXME delay(milliseconds) #elif os.name == "nt": # def _sleep(seconds): # milliseconds = int(seconds * 1000) # # FIXME: use ctypes # win32api.ResetEvent(hInterruptEvent); # win32api.WaitForSingleObject(sleep.sigint_event, milliseconds) # # sleep.sigint_event = win32api.CreateEvent(NULL, TRUE, FALSE, FALSE) # # SetEvent(sleep.sigint_event) will be called by the signal handler of SIGINT elif os.name == "os2": def _sleep(seconds): milliseconds = int(seconds * 1000) # FIXME DosSleep(milliseconds) elif has_libc_sleep: def _sleep(seconds): seconds = int(seconds) _libc_sleep(seconds) else: def _sleep(seconds): python_time.sleep(seconds) def sleep(seconds): if seconds < 0: raise ValueError("sleep length must be non-negative") _sleep(seconds) def _libc_clock_info(use_info): if use_info: info = { 'implementation': 'clock()', 'resolution': 1.0, # FIXME: 'resolution': 1.0 / CLOCKS_PER_SEC, 'monotonic': True, 'adjustable': False, } if os.name != "nt": info['monotonic'] = True else: info = None if has_libc_clock: value = _libc_clock() if use_info: info['implementation'] = 'clock()' else: value = python_time.clock() if use_info: info['implementation'] = 'time.clock()' return (value, info) def _win_perf_counter(use_info): if _win_perf_counter.perf_frequency is None: try: _win_perf_counter.perf_frequency = float(QueryPerformanceFrequency()) except WindowsError: # QueryPerformanceFrequency() fails if the installed # hardware does not support a high-resolution performance # counter return (None, None) value = QueryPerformanceCounter() / _win_perf_counter.perf_frequency if use_info: info = { 'implementation': 'QueryPerformanceCounter', 'resolution': 1.0 / _win_perf_counter.perf_frequency, 'monotonic': True, 'adjustable': False, } else: info = None return (value, info) _win_perf_counter.perf_frequency = None if os.name == 'nt': def _clock(use_info): info = None if _clock.use_performance_counter: value, info = _win_perf_counter(use_info) if value is not None: return (value, info) return _libc_clock_info(use_info) _clock.use_performance_counter = True else: def _clock(use_info): return _libc_clock_info(use_info) def clock(): return _clock(False)[0] class clock_info(object): def __init__(self, implementation, monotonic, adjustable, resolution): self.implementation = implementation self.monotonic = monotonic self.adjustable = adjustable self.resolution = resolution def __repr__(self): return ( 'clockinfo(adjustable=%s, implementation=%r, monotonic=%s, resolution=%s' % (self.adjustable, self.implementation, self.monotonic, self.resolution)) def get_clock_info(name): if name == 'clock': info = _clock(True)[1] elif name == 'perf_counter': info = _perf_counter(True)[1] elif name == 'process_time': info = _process_time(True)[1] elif name == 'time': info = _time(True)[1] elif has_monotonic and name == 'monotonic': info = _monotonic(True)[1] else: raise ValueError("unknown clock: %s" % name) return clock_info(**info) if __name__ == "__main__": from errno import EPERM import threading import unittest class TestPEP418(unittest.TestCase): if not hasattr(unittest.TestCase, 'assertIsInstance'): # Python < 2.7 or Python < 3.2 def assertIsInstance(self, obj, klass): self.assertTrue(isinstance(obj, klass)) def assertGreater(self, a, b): self.assertTrue(a > b) def assertLess(self, a, b): self.assertTrue(a < b) def assertLessEqual(self, a, b): self.assertTrue(a <= b) def assertAlmostEqual(self, first, second, delta): self.assertTrue(abs(first - second) <= delta) def test_clock(self): clock() info = get_clock_info('clock') self.assertEqual(info.monotonic, True) self.assertEqual(info.adjustable, False) def test_get_clock_info(self): clocks = ['clock', 'perf_counter', 'process_time', 'time'] if has_monotonic: clocks.append('monotonic') for name in clocks: info = get_clock_info(name) self.assertIsInstance(info.implementation, str) self.assertNotEqual(info.implementation, '') self.assertIsInstance(info.monotonic, bool) self.assertIsInstance(info.resolution, float) # 0 < resolution <= 1.0 self.assertGreater(info.resolution, 0) self.assertLessEqual(info.resolution, 1) self.assertIsInstance(info.adjustable, bool) self.assertRaises(ValueError, get_clock_info, 'xxx') if not has_monotonic: print("Skip test_monotonic: need time.monotonic") else: def test_monotonic(self): t1 = monotonic() python_time.sleep(0.1) t2 = monotonic() dt = t2 - t1 self.assertGreater(t2, t1) self.assertAlmostEqual(dt, 0.1, delta=0.2) info = get_clock_info('monotonic') self.assertEqual(info.monotonic, True) self.assertEqual(info.adjustable, False) if not has_monotonic or not has_clock_gettime: if not has_monotonic: print('Skip test_monotonic_settime: need time.monotonic') elif not has_clock_gettime: print('Skip test_monotonic_settime: need time.clock_settime') else: def test_monotonic_settime(self): t1 = monotonic() realtime = clock_gettime(CLOCK_REALTIME) # jump backward with an offset of 1 hour try: clock_settime(CLOCK_REALTIME, realtime - 3600) except OSError as err: if err.errno == EPERM: if hasattr(unittest, 'SkipTest'): raise unittest.SkipTest(str(err)) else: print("Skip test_monotonic_settime: %s" % err) return else: raise t2 = monotonic() clock_settime(CLOCK_REALTIME, realtime) # monotonic must not be affected by system clock updates self.assertGreaterEqual(t2, t1) def test_perf_counter(self): perf_counter() def test_process_time(self): start = process_time() python_time.sleep(0.1) stop = process_time() self.assertLess(stop - start, 0.01) info = get_clock_info('process_time') self.assertEqual(info.monotonic, True) self.assertEqual(info.adjustable, False) def test_process_time_threads(self): class BusyThread(threading.Thread): def run(self): while not self.stop: pass thread = BusyThread() thread.stop = False t1 = process_time() thread.start() sleep(0.2) t2 = process_time() thread.stop = True thread.join() self.assertGreater(t2 - t1, 0.1) def test_sleep(self): self.assertRaises(ValueError, sleep, -2) self.assertRaises(ValueError, sleep, -1) sleep(1.2) def test_time(self): value = time() self.assertIsInstance(value, float) info = get_clock_info('time') self.assertEqual(info.monotonic, False) self.assertEqual(info.adjustable, True) if True: from pprint import pprint print("clock: %s" % clock()) if has_monotonic: print("monotonic: %s" % monotonic()) else: print("monotonic: ") print("perf_counter: %s" % perf_counter()) print("process_time: %s" % process_time()) print("time: %s" % time()) clocks = ['clock', 'perf_counter', 'process_time', 'time'] if has_monotonic: clocks.append('monotonic') pprint(dict((name, get_clock_info(name)) for name in clocks)) unittest.main()