Update of /cvsroot/python/python/dist/src/Lib/test In directory sc8-pr-cvs1:/tmp/cvs-serv13682/Lib/test Added Files: test_dummy_thread.py test_dummy_threading.py Log Message: Brett Cannon's dummy_thread and dummy_threading modules (SF patch 622537), with some nitpicking editorial changes. --- NEW FILE: test_dummy_thread.py --- """Generic thread tests. Meant to be used by dummy_thread and thread. To allow for different modules to be used, test_main() can be called with the module to use as the thread implementation as its sole argument. """ import dummy_thread as _thread import time import Queue import random import unittest from test import test_support class LockTests(unittest.TestCase): """Test lock objects.""" def setUp(self): # Create a lock self.lock = _thread.allocate_lock() def test_initlock(self): #Make sure locks start locked self.failUnless(not self.lock.locked(), "Lock object is not initialized unlocked.") def test_release(self): # Test self.lock.release() self.lock.acquire() self.lock.release() self.failUnless(not self.lock.locked(), "Lock object did not release properly.") def test_improper_release(self): #Make sure release of an unlocked thread raises _thread.error self.failUnlessRaises(_thread.error, self.lock.release) def test_cond_acquire_success(self): #Make sure the conditional acquiring of the lock works. self.failUnless(self.lock.acquire(0), "Conditional acquiring of the lock failed.") def test_cond_acquire_fail(self): #Test acquiring locked lock returns False self.lock.acquire(0) self.failUnless(not self.lock.acquire(0), "Conditional acquiring of a locked lock incorrectly " "succeeded.") def test_uncond_acquire_success(self): #Make sure unconditional acquiring of a lock works. self.lock.acquire() self.failUnless(self.lock.locked(), "Uncondional locking failed.") def test_uncond_acquire_return_val(self): #Make sure that an unconditional locking returns True. self.failUnless(self.lock.acquire(1) is True, "Unconditional locking did not return True.") def test_uncond_acquire_blocking(self): #Make sure that unconditional acquiring of a locked lock blocks. def delay_unlock(to_unlock, delay): """Hold on to lock for a set amount of time before unlocking.""" time.sleep(delay) to_unlock.release() self.lock.acquire() delay = 1 #In seconds start_time = int(time.time()) _thread.start_new_thread(delay_unlock,(self.lock, delay)) if test_support.verbose: print print "*** Waiting for thread to release the lock "\ "(approx. %s sec.) ***" % delay self.lock.acquire() end_time = int(time.time()) if test_support.verbose: print "done" self.failUnless((end_time - start_time) >= delay, "Blocking by unconditional acquiring failed.") class MiscTests(unittest.TestCase): """Miscellaneous tests.""" def test_exit(self): #Make sure _thread.exit() raises SystemExit self.failUnlessRaises(SystemExit, _thread.exit) def test_ident(self): #Test sanity of _thread.get_ident() self.failUnless(isinstance(_thread.get_ident(), int), "_thread.get_ident() returned a non-integer") self.failUnless(_thread.get_ident() != 0, "_thread.get_ident() returned 0") def test_LockType(self): #Make sure _thread.LockType is the same type as _thread.allocate_locke() self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType), "_thread.LockType is not an instance of what is " "returned by _thread.allocate_lock()") class ThreadTests(unittest.TestCase): """Test thread creation.""" def test_arg_passing(self): #Make sure that parameter passing works. def arg_tester(queue, arg1=False, arg2=False): """Use to test _thread.start_new_thread() passes args properly.""" queue.put((arg1, arg2)) testing_queue = Queue.Queue(1) _thread.start_new_thread(arg_tester, (testing_queue, True, True)) result = testing_queue.get() self.failUnless(result[0] and result[1], "Argument passing for thread creation using tuple failed") _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue, 'arg1':True, 'arg2':True}) result = testing_queue.get() self.failUnless(result[0] and result[1], "Argument passing for thread creation using kwargs failed") _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True}) result = testing_queue.get() self.failUnless(result[0] and result[1], "Argument passing for thread creation using both tuple" " and kwargs failed") def test_multi_creation(self): #Make sure multiple threads can be created. def queue_mark(queue, delay): """Wait for ``delay`` seconds and then put something into ``queue``""" time.sleep(delay) queue.put(_thread.get_ident()) thread_count = 5 delay = 1.5 testing_queue = Queue.Queue(thread_count) if test_support.verbose: print print "*** Testing multiple thread creation "\ "(will take approx. %s to %s sec.) ***" % (delay, thread_count) for count in xrange(thread_count): _thread.start_new_thread(queue_mark, (testing_queue, round(random.random(), 1))) time.sleep(delay) if test_support.verbose: print 'done' self.failUnless(testing_queue.qsize() == thread_count, "Not all %s threads executed properly after %s sec." % (thread_count, delay)) def test_main(imported_module=None): global _thread if imported_module: _thread = imported_module if test_support.verbose: print print "*** Using %s as _thread module ***" % _thread suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(LockTests)) suite.addTest(unittest.makeSuite(MiscTests)) suite.addTest(unittest.makeSuite(ThreadTests)) test_support.run_suite(suite) if __name__ == '__main__': test_main() --- NEW FILE: test_dummy_threading.py --- # Very rudimentary test of threading module # Create a bunch of threads, let each do some work, wait until all are done from test.test_support import verbose import random import dummy_threading as _threading import time class TestThread(_threading.Thread): def run(self): global running delay = random.random() * 2 if verbose: print 'task', self.getName(), 'will run for', delay, 'sec' sema.acquire() mutex.acquire() running = running + 1 if verbose: print running, 'tasks are running' mutex.release() time.sleep(delay) if verbose: print 'task', self.getName(), 'done' mutex.acquire() running = running - 1 if verbose: print self.getName(), 'is finished.', running, 'tasks are running' mutex.release() sema.release() def starttasks(): for i in range(numtasks): t = TestThread(name="<thread %d>"%i) threads.append(t) t.start() def test_main(): # This takes about n/3 seconds to run (about n/3 clumps of tasks, times # about 1 second per clump). global numtasks numtasks = 10 # no more than 3 of the 10 can run at once global sema sema = _threading.BoundedSemaphore(value=3) global mutex mutex = _threading.RLock() global running running = 0 global threads threads = [] starttasks() if verbose: print 'waiting for all tasks to complete' for t in threads: t.join() if verbose: print 'all tasks done' if __name__ == '__main__': test_main()
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4