Tim Peters wrote: > > BTW, if you want to run threads with unittest, I expect you'll have to > ensure that only the thread that starts unittest reports errors to unittest. > I'll call that "the main thread". You should be aware that if a non-main > thread dies, unittest won't know that. A common problem in the threaded > tests PLabs has written is that a thread dies an ignoble death but unittest > goes on regardless and says "ok!" at the end; if you didn't stare at all the > output, you never would have known something went wrong. > > So wrap the body of your thread's work in a catch-all try/except, and if > anything goes wrong communicate that back to the main thread. For example, > a Queue object (one or more) could work nicely for this. > Thanks for the good tip Tim <wink>! I will add those to my test case. The main challenge in socket testing, however, lies on thread synchronization. I have make some progress on that front, Michael. See if the following code fragment helps: class socketObjTestCase(unittest.TestCase): """Test Case for Socket Object""" def setUp(self): self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__server_addr = ('127.0.0.1', 25339) self.__ready = threading.Event() self.__done = threading.Event() self.__quit = threading.Event() self.__server = threading.Thread(target=server, args=(self.__server_addr, self.__ready, self.__done, self.__quit)) self.__server.start() self.__ready.wait() def tearDown(self): self.__s.close() self.__quit.set() self.__server.join() del self.__server self.__done.wait() self.__ready.clear() self.__done.clear() self.__quit.clear() class server: def __init__(self, addr, ready, done, quit): self.__ready = ready self.__dead = done self.__quit = quit self.__s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__s.setblocking(0) self.__s.bind(addr) self.__s.listen(1) self.getclient() def __del__(self): self.__dead.set() def getclient(self): self.__ready.set() while not self.__quit.isSet(): try: _client, _addr = self.__s.accept() self.serveclient(_client, _addr) except socket.error, msg: pass self.__s.shutdown(2) def serveclient(self, sock, addr): print sock, addr Bernie
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4