diff --git a/tests/FuncTimeoutTests/TestUtils.py b/tests/FuncTimeoutTests/TestUtils.py new file mode 100644 index 0000000..8d720c0 --- /dev/null +++ b/tests/FuncTimeoutTests/TestUtils.py @@ -0,0 +1,185 @@ + +# vim: set ts=4 sw=4 expandtab : + +''' + Copyright (c) 2017 Tim Savannah All Rights Reserved. + + Licensed under the Lesser GNU Public License Version 3, LGPLv3. You should have recieved a copy of this with the source distribution as + LICENSE, otherwise it is available at https://github.com/kata198/func_timeout/LICENSE + + TestUtils.py - Common functions and types used across unit tests +''' + +import copy +import sys +import time +import uuid + +__all__ = ('ARG_NO_DEFAULT', 'getSleepLambda', 'getSleepLambdaWithArgs', 'compareTimes') + +class ARG_NO_DEFAULT_TYPE(object): + + def __eq__(self, other): + ''' + __eq__ - Equal operator ( == ). Returns True if both are instances of ARG_NO_DEFAULT_TYPE, + or either is the type itself. + + @param other - The other item to compare + against this item. + + @return - True if both objects are instances of ARG_NO_DEFAULT_TYPE, + or either are the type itself. + ''' + + # Is self == ARG_NO_DEFAULT_TYPE ever going to be True? Just in case... + if issubclass(other.__class__, ARG_NO_DEFAULT_TYPE) or (other == ARG_NO_DEFAULT_TYPE or self == ARG_NO_DEFAULT_TYPE): + return True + + return False + + def __ne__(self, other): + ''' + __ne__ - Not-equal operator ( != ). Equivilant to not ==. + + @see ARG_NO_DEFAULT_TYPE.__eq__ + ''' + + return not self.__eq__(other) + + def __cmp__(self, other): + ''' + __cmp__ - Perform a "cmp" operation between self and other. + + Added for completeness, like python2 sorting etc. + + @param other - Another object, preferably one of ARG_NO_DEFAULT_TYPE. + + @return - Returns 0 if the objects are both + equal (both instances of ARG_NO_DEFAULT_TYPE), + otherwise to prevent recursion in sorting etc, + the id (location in memory) is compared. + + ''' + if self.__eq__(other): + return 0 + + if id(self) > id(other): + return 1 + + return -1 + +ARG_NO_DEFAULT = ARG_NO_DEFAULT_TYPE() + +def getSleepLambda(sleepTime): + ''' + getSleepLambda - Get a lambda that takes two integer arguments (a, b) + and sleeps for a given number of seconds before returning the sum + + @param sleepTime - The number of seconds to sleep + + @return lambda takes two integer argumennts, "a" and "b". + + NOTE: Lambda's are usually to functions, as functions may get their scope/closure overridden + + @see getSleepLambdaWithArgs + ''' + + # Ensure we don't get a strange reference override on somne versions of python + _sleepTime = copy.copy(sleepTime) + + return lambda a, b : int(bool(time.sleep(_sleepTime))) + a + b + + +def getSleepLambdaWithArgs(sleepTime, args): + ''' + getSleepLambdaWithArgs - Get a lambda that takes a variable collection of arguments + and sleeps for a given number of seconds before returning the sum of arguments + + @param sleepTime - The number of seconds to sleep + + @param args list - A list that represents the arguments to + the returned lambda. Should be a list of tuples. + + The first tuple element is the name of the parameter. If a second paramater is present, + it will be used as the default value for that argument. + + Keep in mind order is important ( i.e. no args with default following args without default), + as they will be used in the order provided. + + All arguments should expect integer values. + + @return lambda with the provided arguments + + NOTE: Lambda's are usually to functions, as functions may get their scope/closure overridden + + @see getSleepLambda + ''' + + # Ensure we don't get a strange reference override on somne versions of python + _sleepTime = copy.copy(sleepTime) + + if not args: + raise ValueError('Empty "args" param. See docstring for usage details. Got: ' + repr(args)) + + _args = copy.deepcopy(args) + + argStrs = [] + argNames = [] + for arg in _args: + argNames.append(arg[0]) + + if len(arg) == 1: + argStrs.append(arg[0]) + else: + argStrs.append('%s=%d' %( arg[0], arg[1] ) ) + + argStr = ', '.join(argStrs) + + sumStr = ' + '.join(argNames) + + +# lambdaName = 'tmplambda_' + str(uuid.uuid4().hex) + +# print ( 'Function is: %s' %('''lambda %s : int(bool(time.sleep(%f))) + %s''' %(argStr, sleepTime, sumStr, ) ) ) + return eval('''lambda %s : int(bool(time.sleep(%f))) + %s''' % (argStr, sleepTime, sumStr, ) ) + + +def compareTimes(timeEnd, timeStart, cmpTime, roundTo=None, deltaFixed=.05, deltaPct=None): + ''' + compareTimes - Compare two times, with support for max error + + @param timeEnd - End time + @param timeStart - Start time + + @param cmpTime - Time to compare against + + @param roundTo - Number of digits to round-off to + + @param deltaFixed Default .05, If provided and if difference is within this much, the two values are considered equal + @param deltaPct Default None, if provided and if difference is within this much, the two values are considered equal. 1 = 100%, .5 = 50% + + Example: if trying to determine if function ran for 2 seconds with an error of .05 seconds, + + if compareTimes( timeEnd, timeStart, 2, deltaFixed=.05, deltaPct=None) == 0 + + @return cmp style, < 0 if time delta is less than #cmpTime + = 0 if time delta is equal (taking into account #deltaFixed and #deltaPct) + > 0 if time delta is greater than #cmpTime + ''' + + timeDiff = timeEnd - timeStart + + delta = timeDiff - cmpTime + if roundTo is not None: + delta = round(delta, roundTo) + absDelta = abs(delta) + + if deltaFixed and absDelta <= deltaFixed: + return 0 + + if deltaPct and absDelta <= (cmpTime * float(deltaPct)): + return 0 + + return delta + +# vim: set ts=4 sw=4 expandtab : diff --git a/tests/FuncTimeoutTests/test_TestUtils.py b/tests/FuncTimeoutTests/test_TestUtils.py new file mode 100755 index 0000000..ad8a4d8 --- /dev/null +++ b/tests/FuncTimeoutTests/test_TestUtils.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# vim: set ts=4 sw=4 expandtab : + +''' + Copyright (c) 2017 Tim Savannah All Rights Reserved. + + Licensed under the Lesser GNU Public License Version 3, LGPLv3. You should have recieved a copy of this with the source distribution as + LICENSE, otherwise it is available at https://github.com/kata198/func_timeout/LICENSE +''' + +import copy +import sys +import subprocess +import time + +from func_timeout import func_timeout, FunctionTimedOut, func_set_timeout + +from TestUtils import ARG_NO_DEFAULT, ARG_NO_DEFAULT_TYPE, getSleepLambda, getSleepLambdaWithArgs, compareTimes + +class TestBasic(object): + ''' + TestBasic - Perform tests using the basic func_timeout function + ''' + + + def test_ArgNoDefault(self): + + assert ARG_NO_DEFAULT == ARG_NO_DEFAULT , 'Expected ARG_NO_DEFAULT to equal itself' + assert (ARG_NO_DEFAULT != ARG_NO_DEFAULT) is False , 'Expected ARG_NO_DEFAULT to not not equal itself' + + + assert ARG_NO_DEFAULT == ARG_NO_DEFAULT_TYPE , 'Expected ARG_NO_DEFAULT to equal ARG_NO_DEFAULT_TYPE' + assert ARG_NO_DEFAULT_TYPE == ARG_NO_DEFAULT , '2Expected ARG_NO_DEFAULT to equal ARG_NO_DEFAULT_TYPE' + + otherInstance = ARG_NO_DEFAULT_TYPE() + + assert otherInstance == ARG_NO_DEFAULT , 'Assert ARG_NO_DEFAULT_TYPE instances equal eachother' + assert not (otherInstance != ARG_NO_DEFAULT) , 'Assert ARG_NO_DEFAULT_TYPE instances not not-equal eachother' + + + def test_compareTimes(self): + + startTime = 50.00 + endTime = 52.03 + + assert compareTimes(endTime, startTime, 2, roundTo=2, deltaFixed=.05, deltaPct=None) == 0 , 'Expected deltaFixed to be > abs(delta) to show times equal' + + assert compareTimes(endTime, startTime, 2, roundTo=2, deltaFixed=.01, deltaPct=None) == .03 , 'Expected when deltaFixed is less than the abs delta, actual diff to be returned.' + + assert compareTimes(endTime, startTime, 2, roundTo=2, deltaFixed=None, deltaPct=.2) == 0 , 'Expected deltaPct * cmpTime when greater than abs delta to be equal' + + assert compareTimes(endTime, startTime, 2, roundTo=2, deltaFixed=None, deltaPct=.0002) == .03 , 'Expected deltaPct * cmpTime when less than abs delta to be actual diff' + + +if __name__ == '__main__': + sys.exit(subprocess.Popen('GoodTests.py -n1 "%s" %s' %(sys.argv[0], ' '.join(['"%s"' %(arg.replace('"', '\\"'), ) for arg in sys.argv[1:]]) ), shell=True).wait()) + +# vim: set ts=4 sw=4 expandtab : diff --git a/tests/FuncTimeoutTests/test_TestUtilsSleep.py b/tests/FuncTimeoutTests/test_TestUtilsSleep.py new file mode 100755 index 0000000..48f8719 --- /dev/null +++ b/tests/FuncTimeoutTests/test_TestUtilsSleep.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# vim: set ts=4 sw=4 expandtab : + +''' + Copyright (c) 2017 Tim Savannah All Rights Reserved. + + Licensed under the Lesser GNU Public License Version 3, LGPLv3. You should have recieved a copy of this with the source distribution as + LICENSE, otherwise it is available at https://github.com/kata198/func_timeout/LICENSE +''' + +import copy +import sys +import subprocess +import time + +from func_timeout import func_timeout, FunctionTimedOut, func_set_timeout + +from TestUtils import ARG_NO_DEFAULT, ARG_NO_DEFAULT_TYPE, getSleepLambda, getSleepLambdaWithArgs, compareTimes + +class TestBasicSleep(object): + ''' + TestBasicSleep - Perform test on the sleep generator function. + + Seperate file so runs in separate GoodTests process ( for performance reasons ) + ''' + + + def test_getSleepLambda(self): + + sleepLambda = getSleepLambda(2) + startTime = time.time() + sleepLambda(2, 3) + endTime = time.time() + + assert compareTimes(endTime, startTime, 2, 2, deltaFixed=.1, deltaPct=None) == 0 , 'Expected getSleepLambda(2) to take 2 seconds.' + + sleepLambda = getSleepLambda(3.1) + startTime = time.time() + sleepLambda(2, 3) + endTime = time.time() + + assert compareTimes(endTime, startTime, 3.1, 2, deltaFixed=.1, deltaPct=None) == 0 , 'Expected getSleepLambda(3.1) to take 3.1 seconds.' + +if __name__ == '__main__': + sys.exit(subprocess.Popen('GoodTests.py -n1 "%s" %s' %(sys.argv[0], ' '.join(['"%s"' %(arg.replace('"', '\\"'), ) for arg in sys.argv[1:]]) ), shell=True).wait()) + +# vim: set ts=4 sw=4 expandtab : diff --git a/tests/FuncTimeoutTests/test_TestUtilsSleepWithArgs.py b/tests/FuncTimeoutTests/test_TestUtilsSleepWithArgs.py new file mode 100755 index 0000000..55fe01a --- /dev/null +++ b/tests/FuncTimeoutTests/test_TestUtilsSleepWithArgs.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# vim: set ts=4 sw=4 expandtab : + +''' + Copyright (c) 2017 Tim Savannah All Rights Reserved. + + Licensed under the Lesser GNU Public License Version 3, LGPLv3. You should have recieved a copy of this with the source distribution as + LICENSE, otherwise it is available at https://github.com/kata198/func_timeout/LICENSE +''' + +import copy +import sys +import subprocess +import time + +from func_timeout import func_timeout, FunctionTimedOut, func_set_timeout + +from TestUtils import ARG_NO_DEFAULT, ARG_NO_DEFAULT_TYPE, getSleepLambda, getSleepLambdaWithArgs, compareTimes + +class TestBasicSleepWithArgs(object): + ''' + TestBasicSleepWithArgs - Perform test on the sleep generator with args function. + + Seperate file so runs in separate GoodTests process ( for performance reasons ) + ''' + + + def test_getSleepLambdaWithArgs(self): + + sleepLambda = getSleepLambdaWithArgs(2, [ ('a', ), ('b', ), ('c', 4) ] ) + startTime = time.time() + try: + sleepLambda(1, 2) + except: + raise AssertionError('Expected to have 1 default arg and 2 standard. Tried 3 args') + endTime = time.time() + + assert compareTimes(endTime, startTime, 2, 2, deltaFixed=.1, deltaPct=None) == 0 , 'Expected getSleepLambdaWithArgs(2) to take 2 seconds.' + + try: + sleepLambda(4, 7, 12) + except: + raise AssertionError('Expected to have 1 default arg and 2 standard. Tried 3 args.') + + + + sleepLambda = getSleepLambdaWithArgs(3.1, [ ('a', ), ('xxx', )] ) + startTime = time.time() + try: + sleepLambda(xxx=2, a=3) + except: + raise AssertionError('Expected to be able to use provided field names when calling function') + endTime = time.time() + + assert compareTimes(endTime, startTime, 3.1, 2, deltaFixed=.1, deltaPct=None) == 0 , 'Expected getSleepLambdaWithArgs(3.1) to take 3.1 seconds.' + +if __name__ == '__main__': + sys.exit(subprocess.Popen('GoodTests.py -n1 "%s" %s' %(sys.argv[0], ' '.join(['"%s"' %(arg.replace('"', '\\"'), ) for arg in sys.argv[1:]]) ), shell=True).wait()) + +# vim: set ts=4 sw=4 expandtab : diff --git a/tests/runTests.py b/tests/runTests.py new file mode 100755 index 0000000..17fa3f5 --- /dev/null +++ b/tests/runTests.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python +# +# Copyright (c) 2015, 2016, 2017 Tim Savannah under following terms: +# You may modify and redistribe this script with your project +# +# It will download the latest GoodTests.py and use it to execute the tests. +# +# This should be placed in a directory, "tests", at the root of your project. It assumes that ../$MY_PACKAGE_MODULE is the path to your test module, and will create a symlink to it in order to run tests. +# The tests should be found in $MY_TEST_DIRECTORY in given "tests" folder. + + +# NOTE: Since version 1.2.3, you can also import this (like from a graphical application) and call the "main()" function. +# All of the following globals are the defaults, but can be overridden when calling main() (params have the same name as the globals). + +import imp +import os + +import subprocess +import sys + +# URL to current version of GoodTests.py - You only need to change this if you host an internal copy. +GOODTESTS_URL = 'https://raw.githubusercontent.com/kata198/GoodTests/master/GoodTests.py' + +# This should be your module name, and can be any relative or absolute path, or just a module name. +# If just a module name is given, the directory must be in current directory or parent directory. +MY_PACKAGE_MODULE = 'func_timeout' + +# Normally, you want to test the codebase during development, so you don't care about the site-packages installed version. +# If you want to allow testing with any module by @MY_PACKAGE_MODULE in the python path, change this to True. +ALLOW_SITE_INSTALL = False + +# This is the test directory that should contain all your tests. This should be a directory in your "tests" folder +MY_TEST_DIRECTORY = 'FuncTimeoutTests' + +__version__ = '2.1.1' +__version_tuple__ = (2, 1, 1) + +def findGoodTests(): + ''' + findGoodTests - Tries to find GoodTests.py + + @return { + 'path' -> Path to GoodTests.py (for execution) + 'success' -> True/False if we successfully found GoodTests.py + } + ''' + pathSplit = os.environ['PATH'].split(':') + if '.' not in pathSplit: + pathSplit = ['.'] + pathSplit + os.environ['PATH'] = ':'.join(pathSplit) + + result = '' + success = False + for path in pathSplit: + if path.endswith('/'): + path = path[:-1] + guess = path + '/GoodTests.py' + if os.path.exists(guess): + success = True + result = guess + break + + return { + 'path' : result, + "success" : success + } + +def try_pip_install(): + pipe = subprocess.Popen('pip install GoodTests', shell=True) + res = pipe.wait() + + return res + +def download_goodTests(GOODTESTS_URL=None): + ''' + download_goodTests - Attempts to download GoodTests, using the default global url (or one provided). + + @return - 0 on success (program should continue), otherwise non-zero (program should abort with this exit status) + ''' + if GOODTESTS_URL is None: + GOODTESTS_URL = globals()['GOODTESTS_URL'] + + validAnswer = False + while validAnswer == False: + sys.stdout.write('GoodTests not found. Would you like to install it to local folder? (y/n): ') + sys.stdout.flush() + answer = sys.stdin.readline().strip().lower() + if answer not in ('y', 'n', 'yes', 'no'): + continue + validAnswer = True + answer = answer[0] + + if answer == 'n': + sys.stderr.write('Cannot run tests without installing GoodTests. http://pypi.python.org/pypi/GoodTests or https://github.com/kata198/Goodtests\n') + return 1 + try: + import urllib2 as urllib + except ImportError: + try: + import urllib.request as urllib + except: + sys.stderr.write('Failed to import urllib. Trying pip.\n') + res = try_pip_install() + if res != 0: + sys.stderr.write('Failed to install GoodTests with pip or direct download. aborting.\n') + return 1 + try: + response = urllib.urlopen(GOODTESTS_URL) + contents = response.read() + if str != bytes: + contents = contents.decode('ascii') + except Exception as e: + sys.stderr.write('Failed to download GoodTests.py from "%s"\n%s\n' %(GOODTESTS_URL, str(e))) + sys.stderr.write('\nTrying pip.\n') + res = try_pip_install() + if res != 0: + sys.stderr.write('Failed to install GoodTests with pip or direct download. aborting.\n') + return 1 + try: + with open('GoodTests.py', 'w') as f: + f.write(contents) + except Exception as e: + sys.stderr.write('Failed to write to GoodTests.py\n%s\n' %(str(e,))) + return 1 + try: + os.chmod('GoodTests.py', 0o775) + except: + sys.stderr.write('WARNING: Failed to chmod +x GoodTests.py, may not be able to be executed.\n') + + try: + import GoodTests + except ImportError: + sys.stderr.write('Seemed to download GoodTests okay, but still cannot import. Aborting.\n') + return 1 + + return 0 + + +def main(thisDir=None, additionalArgs=[], MY_PACKAGE_MODULE=None, ALLOW_SITE_INSTALL=None, MY_TEST_DIRECTORY=None, GOODTESTS_URL=None): + ''' + Do the work - Try to find GoodTests.py, else prompt to download it, then run the tests. + + @param thisDir - None to use default (directory this test file is in, or if not obtainable, current directory). + @param additionalArgs - Any additional args to pass to GoodTests.py + + Remainder of params take their global (top of file) defaults unless explicitly set here. See top of file for documentation. + + @return - Exit code of application. 0 on success, non-zero on failure. + + TODO: Standardize return codes so external applications can derive failure without parsing error strings. + ''' + + if MY_PACKAGE_MODULE is None: + MY_PACKAGE_MODULE = globals()['MY_PACKAGE_MODULE'] + if ALLOW_SITE_INSTALL is None: + ALLOW_SITE_INSTALL = globals()['ALLOW_SITE_INSTALL'] + if MY_TEST_DIRECTORY is None: + MY_TEST_DIRECTORY = globals()['MY_TEST_DIRECTORY'] + if GOODTESTS_URL is None: + GOODTESTS_URL = globals()['GOODTESTS_URL'] + + + if not thisDir: + thisDir = os.path.dirname(__file__) + + if not thisDir: + thisDir = str(os.getcwd()) + elif not thisDir.startswith('/'): + thisDir = str(os.getcwd()) + '/' + thisDir + + # If GoodTests is in current directory, make sure we find it later + if os.path.exists('./GoodTests.py'): + os.environ['PATH'] = str(os.getcwd()) + ':' + os.environ['PATH'] + + os.chdir(thisDir) + + goodTestsInfo = findGoodTests() + if goodTestsInfo['success'] is False: + downloadRet = download_goodTests(GOODTESTS_URL) + if downloadRet != 0: + return downloadRet + goodTestsInfo = findGoodTests() + if goodTestsInfo['success'] is False: + sys.stderr.write('Could not download or find GoodTests.py. Try to download it yourself using "pip install GoodTests", or wget %s\n' %( GOODTESTS_URL,)) + return 1 + + baseName = os.path.basename(MY_PACKAGE_MODULE) + dirName = os.path.dirname(MY_PACKAGE_MODULE) + + newPath = None + if dirName not in ('.', ''): + if dirName.startswith('.'): + dirName = os.getcwd() + os.sep + dirName + os.sep + newPath = dirName + elif dirName == '': + inCurrentDir = False + try: + imp.find_module(MY_PACKAGE_MODULE) + inCurrentDir = True + except ImportError: + # COMPAT WITH PREVIOUS runTests.py: Try plain module in parent directory + foundIt = False + oldSysPath = sys.path[:] + sys.path = [os.path.realpath(os.getcwd() + os.sep + '..' + os.sep)] + try: + imp.find_module(MY_PACKAGE_MODULE) + foundIt = True + sys.path = oldSysPath + except ImportError as e: + sys.path = oldSysPath + if not ALLOW_SITE_INSTALL: + sys.stderr.write('Cannot find "%s" locally.\n' %(MY_PACKAGE_MODULE,)) + return 2 + else: + try: + __import__(baseName) + except: + sys.stderr.write('Cannot find "%s" locally or in global python path.\n' %(MY_PACKAGE_MODULE,)) + return 2 + + if foundIt is True: + newPath = os.path.realpath(os.getcwd() + os.sep + '..' + os.sep) + if inCurrentDir is True: + newPath = os.path.realpath(os.getcwd() + os.sep + '..' + os.sep) + + if newPath: + newPythonPath = [newPath] + [x for x in os.environ.get('PYTHONPATH', '').split(':') if x] + os.environ['PYTHONPATH'] = ':'.join(newPythonPath) + sys.path = [newPath] + sys.path + + try: + __import__(baseName) + except ImportError as e: + if baseName.endswith(('.py', '.pyc', '.pyo')): + MY_PACKAGE_MODULE = baseName[ : baseName.rindex('.')] + + if e.name != MY_PACKAGE_MODULE: + sys.stderr.write('Error while importing %s: %s\n Likely this is another dependency that needs to be installed\nPerhaps run "pip install %s" or install the providing package.\n\n' %(e.name, str(e), e.name)) + return 1 + sys.stderr.write('Could not import %s. Either install it or otherwise add to PYTHONPATH\n%s\n' %(MY_PACKAGE_MODULE, str(e))) + return 1 + + if not os.path.isdir(MY_TEST_DIRECTORY): + if not os.path.exists(MY_TEST_DIRECTORY): + sys.stderr.write('Cannot find test directory: %s\n' %(MY_TEST_DIRECTORY,)) + else: + sys.stderr.write('Provided test directory, "%s" is not a directory.\n' %(MY_TEST_DIRECTORY,)) + return 3 + + sys.stdout.write('Starting test..\n') + sys.stdout.flush() + sys.stderr.flush() + + + didTerminate = False + + pipe = subprocess.Popen([goodTestsInfo['path']] + additionalArgs + [MY_TEST_DIRECTORY], env=os.environ, shell=False) + while True: + try: + pipe.wait() + break + except KeyboardInterrupt: + if not didTerminate: + pipe.terminate() + didTerminate = True + else: + pipe.kill() + break + + return 0 + + +if __name__ == '__main__': + ret = main(None, sys.argv[1:]) + sys.exit(ret)