Python : An interpreted high-level programming language that has a lot of support for "systems programming" and which integrates well with existing software in other languages. • Concurrency : Doing more than one thing at a time. Of particular interest to programmers writing code for running on big iron, but also of interest for users of multicore PCs. Usually a bad idea--except when it's not.
We're going to explore the state of concurrent programming idioms being used in Python • A look at tradeoffs and limitations • Hopefully provide some clarity • A tour of various parts of the standard library • Goal is to go beyond the user manual and tie everything together into a "bigger picture."
primary focus is on Python • This is not a tutorial on how to write concurrent programs or parallel algorithms • No mathematical proofs involving "dining philosophers" or anything like that • I will assume that you have had some prior exposure to topics such as threads, message passing, network programming, etc.
are often able to get complex systems to "work" in much less time using a high-level language like Python than if they're spending all of their time hacking C code. 21 "The best performance improvement is the transition from the nonworking to the working state." - John Ousterhout "You can always optimize it later." - Unknown "Premature optimization is the root of all evil." - Donald Knuth
Many concurrent programs are "I/O bound" • They spend virtually all of their time sitting around waiting • Python can "wait" just as fast as C (maybe even faster--although I haven't measured it). • If there's not much processing, who cares if it's being done in an interpreter? (One exception : if you need an extremely rapid response time as in real-time systems) 22
usually a really bad option if you're merely trying to make an inefficient Python script run faster • Because its interpreted, you can often make huge gains by focusing on better algorithms or offloading work into C extensions • For example, a C extension might make a script run 20x faster vs. the marginal improvement of parallelizing a slow script to run on a couple of CPU cores 24
threads are defined by a class import time import threading class CountdownThread(threading.Thread): def __init__(self,count): threading.Thread.__init__(self) self.count = count def run(self): while self.count > 0: print "Counting down", self.count self.count -= 1 time.sleep(5) return • You inherit from Thread and redefine run() 32
threads are defined by a class import time import threading class CountdownThread(threading.Thread): def __init__(self,count): threading.Thread.__init__(self) self.count = count def run(self): while self.count > 0: print "Counting down", self.count self.count -= 1 time.sleep(5) return • You inherit from Thread and redefine run() 33 This code executes in the thread
Is this actually a real concern? x = 0 # A shared value def foo(): global x for i in xrange(100000000): x += 1 def bar(): global x for i in xrange(100000000): x -= 1 t1 = threading.Thread(target=foo) t2 = threading.Thread(target=bar) t1.start(); t2.start() t1.join(); t2.join() # Wait for completion print x # Expected result is 0 43 • Yes, the print produces a random nonsensical value each time (e.g., -83412 or 1627732)
• Commonly used to enclose critical sections x = 0 x_lock = threading.Lock() 51 Thread-1 -------- ... x_lock.acquire() x = x + 1 x_lock.release() ... Thread-2 -------- ... x_lock.acquire() x = x - 1 x_lock.release() ... Critical Section • Only one thread can execute in critical section at a time (lock gives exclusive access)
• It is your responsibility to identify and lock all "critical sections" 52 x = 0 x_lock = threading.Lock() Thread-1 -------- ... x_lock.acquire() x = x + 1 x_lock.release() ... Thread-2 -------- ... x = x - 1 ... If you use a lock in one place, but not another, then you're missing the whole point. All modifications to shared state must be enclosed by lock acquire()/release().
Don't write code that acquires more than one mutex lock at a time 56 x = 0 y = 0 x_lock = threading.Lock() y_lock = threading.Lock() with x_lock: statements using x ... with y_lock: statements using x and y ... • This almost invariably ends up creating a program that mysteriously deadlocks (even more fun to debug than a race condition)
Lock m = threading.RLock() # Create a lock m.acquire() # Acquire the lock m.release() # Release the lock • Similar to a normal lock except that it can be reacquired multiple times by the same thread • However, each acquire() must have a release() • Common use : Code-based locking (where you're locking function/method execution as opposed to data access) 57
synchronization primitive m = threading.Semaphore(n) # Create a semaphore m.acquire() # Acquire m.release() # Release • acquire() - Waits if the count is 0, otherwise decrements the count and continues • release() - Increments the count and signals waiting threads (if any) • Unlike locks, acquire()/release() can be called in any order and by any thread 59
e = threading.Event() e.isSet() # Return True if event set e.set() # Set event e.clear() # Clear event e.wait() # Wait for event • This can be used to have one or more threads wait for something to occur • Setting an event will unblock all waiting threads simultaneously (if any) • Common use : barriers, notification 63
an event to ensure proper initialization init = threading.Event() def worker(): init.wait() # Wait until initialized statements ... def initialize(): statements # Setting up statements # ... ... init.set() # Done initializing Thread(target=worker).start() # Launch workers Thread(target=worker).start() Thread(target=worker).start() initialize() # Initialize 64
an event to signal "completion" def master(): ... item = create_item() evt = Event() worker.send((item,evt)) ... # Other processing ... ... ... ... ... # Wait for worker evt.wait() 65 Worker Thread item, evt = get_work() processing processing ... ... # Done evt.set() • Might use for asynchronous processing, etc.
Objects cv = threading.Condition([lock]) cv.acquire() # Acquire the underlying lock cv.release() # Release the underlying lock cv.wait() # Wait for condition cv.notify() # Signal that a condition holds cv.notifyAll() # Signal all threads waiting 66 • A combination of locking/signaling • Lock is used to protect code that establishes some sort of "condition" (e.g., data available) • Signal is used to notify other threads that a "condition" has changed state
Use : Producer/Consumer patterns items = [] items_cv = threading.Condition() 68 item = produce_item() with items_cv: items.append(item) items_cv.notify() with items_cv: while not items: items_cv.wait() x = items.pop(0) # Do something with x ... Producer Thread Consumer Thread • Next you add signaling and waiting • Here, the producer signals the consumer that it put data into the shared list
tricky bits involving wait() 69 with items_cv: while not items: items_cv.wait() x = items.pop(0) # Do something with x ... Consumer Thread • Before waiting, you have to acquire the lock • wait() releases the lock when waiting and reacquires when woken • Conditions are often transient and may not hold by the time wait() returns. So, you must always double-check (hence, the while loop)
Python has a thread-safe queuing module • Basic operations from Queue import Queue q = Queue([maxsize]) # Create a queue q.put(item) # Put an item on the queue q.get() # Get an item from the queue q.empty() # Check if empty q.full() # Check if full 73 • Usage : You try to strictly adhere to get/put operations. If you do this, you don't need to use other synchronization primitives.
also have a signaling mechanism q.task_done() # Signal that work is done q.join() # Wait for all work to be done 75 • Many Python programmers don't know about this (since it's relatively new) • Used to determine when processing is done for item in produce_items(): q.put(item) # Wait for consumer q.join() while True: item = q.get() consume_item(item) q.task_done() Producer Thread Consumer Thread
Consider this CPU-bound function def count(n): while n > 0: n -= 1 79 • Sequential Execution: count(100000000) count(100000000) • Threaded execution t1 = Thread(target=count,args=(100000000,)) t1.start() t2 = Thread(target=count,args=(100000000,)) t2.start() • Now, you might expect two threads to run twice as fast on multiple CPU cores
What happens during the periodic check? • In the main thread only, signal handlers will execute if there are any pending signals • Release and reacquisition of the GIL • That last bullet describes how multiple CPU- bound threads get to run (by briefly releasing the GIL, other threads get a chance to run). 88
• Ticks loosely map to interpreter instructions 89 def countdown(n): while n > 0: print n n -= 1 >>> import dis >>> dis.dis(countdown) 0 SETUP_LOOP 33 (to 36) 3 LOAD_FAST 0 (n) 6 LOAD_CONST 1 (0) 9 COMPARE_OP 4 (>) 12 JUMP_IF_FALSE 19 (to 34) 15 POP_TOP 16 LOAD_FAST 0 (n) 19 PRINT_ITEM 20 PRINT_NEWLINE 21 LOAD_FAST 0 (n) 24 LOAD_CONST 2 (1) 27 INPLACE_SUBTRACT 28 STORE_FAST 0 (n) 31 JUMP_ABSOLUTE 3 ... Tick 1 Tick 2 Tick 3 Tick 4 • Instructions in the Python VM
ticks are not time-based • Ticks don't have consistent execution times 90 • Long operations can block everything >>> nums = xrange(100000000) >>> -1 in nums False >>> 1 tick (~ 6.6 seconds) • Try hitting Ctrl-C (ticks are uninterruptible) >>> nums = xrange(100000000) >>> -1 in nums ^C^C^C (nothing happens, long pause) ... KeyboardInterrupt >>>
switching is far more subtle than most programmers realize (it's tied up in the OS) 93 Thread 1 100 ticks check check check 100 ticks Thread 2 ... Operating System signal signal SUSPENDED Thread Context Switch check • The lag between signaling and scheduling may be significant (depends on the OS) SUSPENDED signal signal check signal
With multiple cores, CPU-bound threads get scheduled simultaneously (on different processors) and then have a GIL battle 98 Thread 1 (CPU 1) Thread 2 (CPU 2) Release GIL signal Acquire GIL Wake Acquire GIL (fails) Release GIL Acquire GIL signal Wake Acquire GIL (fails) run run run • The waiting thread (T2) may make 100s of failed GIL acquisitions before any success
Extensions • Here's some Python test code 103 def churner(n): count = 1000000 while count > 0: churn(n) # C extension function count -= 1 # Sequential execution churner(n) churner(n) # Threaded execution t1 = threading.Thread(target=churner, args=(n,)) t2 = threading.Thread(target=churner, args=(n,)) t1.start() t2.start()
there? • Simplifies the implementation of the Python interpreter (okay, sort of a lame excuse) • Better suited for reference counting (Python's memory management scheme) • Simplifies the use of C/C++ extensions. Extension functions do not need to worry about thread synchronization • And for now, it's here to stay... (although people continue to try and eliminate it) 105
Threads are still useful for I/O-bound apps • For example : A network server that needs to maintain several thousand long-lived TCP connections, but is not doing tons of heavy CPU processing • Here, you're really only limited by the host operating system's ability to manage and schedule a lot of threads • Most systems don't have much of a problem-- even with thousands of threads 108
There is an alternative implementation of pickle called cPickle (written in C) • Use it whenever possible--it is much faster 117 import cPickle as pickle ... pickle.dump(someobj,f) • There is some history involved. There are a few things that cPickle can't do, but they are somewhat obscure (so don't worry about it)
a subprocess and hooking up the child process via a pipe • Use the subprocess module 120 import subprocess p = subprocess.Popen(['python','child.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) p.stdin.write(data) # Send data to subprocess p.stdout.read(size) # Read data from subprocess Python p.stdin p.stdout Python sys.stdin sys.stdout Pipe
A class that wraps a pair of files 122 # channel.py import pickle class Channel(object): def __init__(self,out_f,in_f): self.out_f = out_f self.in_f = in_f def send(self,item): pickle.dump(item,self.out_f) self.out_f.flush() def recv(self): return pickle.load(self.in_f) • Send/Receive implemented using pickle
A sample child process 123 # child.py import channel import sys ch = channel.Channel(sys.stdout,sys.stdin) while True: item = ch.recv() ch.send(("child",item)) • Parent process setup # parent.py import channel import subprocess p = subprocess.Popen(['python','child.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) ch = channel.Channel(p.stdin,p.stdout)
Using the child worker 124 >>> ch.send("Hello World") Hello World >>> ch.send(42) 42 >>> ch.send([1,2,3,4]) [1, 2, 3, 4] >>> ch.send({'host':'python.org','port':80}) {'host': 'python.org', 'port': 80} >>> This output is being produced by the child • You can send almost any Python object (numbers, lists, dictionaries, instances, etc.)
tasks using a Process class import time import multiprocessing class CountdownProcess(multiprocessing.Process): def __init__(self,count): multiprocessing. Process.__init__(self) self.count = count def run(self): while self.count > 0: print "Counting down", self.count self.count -= 1 time.sleep(5) return • You inherit from Process and redefine run() 130
launch, same idea as with threads if __name__ == '__main__': p1 = CountdownProcess(10) # Create the process object p1.start() # Launch the process p2 = CountdownProcess(20) # Create another process p2.start() # Launch • Processes execute until run() stops • A critical detail : Always launch in main as shown (required for Windows) 131
Consider this CPU-bound function def count(n): while n > 0: n -= 1 133 • Sequential Execution: count(100000000) count(100000000) • Multiprocessing Execution p1 = Process(target=count,args=(100000000,)) p1.start() p2 = Process(target=count,args=(100000000,)) p2.start() 24.6s 12.5s • Yes, it seems to work
for sending/receiving objects 136 p = Process(target=somefunc) (c1, c2) = multiprocessing.Pipe() • Returns a pair of connection objects (one for each end-point of the pipe) • Here are methods for communication c.send(obj) # Send an object c.recv() # Receive an object c.send_bytes(buffer) # Send a buffer of bytes c.recv_bytes([max]) # Receive a buffer of bytes c.poll([timeout]) # Check for data
= Process(target=somefunc) def consumer(p1, p2): p1.close() # Close producer's end (not used) while True: try: item = p2.recv() except EOFError: break print item # Do other useful work here • A simple data consumer • A simple data producer def producer(sequence, output_p): for item in sequence: output_p.send(item)
= Process(target=somefunc) if __name__ == '__main__': p1, p2 = multiprocessing.Pipe() cons = multiprocessing.Process( target=consumer, args=(p1,p2)) cons.start() # Close the input end in the producer p2.close() # Go produce some data sequence = xrange(100) # Replace with useful data producer(sequence, p1) # Close the pipe p1.close()
also provides a queue • The programming interface is the same 140 p = Process(target=somefunc) from multiprocessing import Queue q = Queue() q.put(item) # Put an item on the queue item = q.get() # Get an item from the queue • There is also a joinable Queue from multiprocessing import JoinableQueue q = JoinableQueue() q.task_done() # Signal task completion q.join() # Wait for completion
consumer process 142 p = Process(target=somefunc) def consumer(input_q): while True: # Get an item from the queue item = input_q.get() # Process item print item # Signal completion input_q.task_done() • A producer process def producer(sequence,output_q): for item in sequence: # Put the item on the queue output_q.put(item)
the two processes 143 p = Process(target=somefunc) if __name__ == '__main__': from multiprocessing import Process, JoinableQueue q = JoinableQueue() # Launch the consumer process cons_p = Process(target=consumer,args=(q,)) cons_p.daemon = True cons_p.start() # Run the producer function on some data sequence = range(100) # Replace with useful data producer(sequence,q) # Wait for the consumer to finish q.join()
have written threaded programs that strictly stick to the queuing model, they can probably be ported to multiprocessing • The following restrictions apply • Only objects compatible with pickle can be queued • Tasks can not rely on any shared data other than a reference to the queue 144 p = Process(target=somefunc)
a function that does some work • Example : Compute a SHA-512 digest of a file 147 p = Process(target=somefunc) import hashlib def compute_digest(filename): digest = hashlib.sha512() f = open(filename,'rb') while True: chunk = f.read(8192) if not chunk: break digest.update(chunk) f.close() return digest.digest() • This is just a normal function (no magic)
is some code that uses our function • Make a dict mapping filenames to digests 148 p = Process(target=somefunc) import os TOPDIR = "/Users/beazley/Software/Python-3.0" digest_map = {} for path, dirs, files in os.walk(TOPDIR): for name in files: fullname = os.path.join(path,name) digest_map[fullname] = compute_digest(fullname) • Running this takes about 10s on my machine
a pool, you can farm out work • Here's a small sample 149 p = Process(target=somefunc) p = multiprocessing.Pool(2) # 2 processes result = p.apply_async(compute_digest,('README.txt',)) ... ... various other processing ... digest = result.get() # Get the result • This executes a function in a worker process and retrieves the result at a later time • The worker churns in the background allowing the main program to do other things
a dictionary mapping names to digests 150 p = Process(target=somefunc) import multiprocessing import os TOPDIR = "/Users/beazley/Software/Python-3.0" p = multiprocessing.Pool(2) # Make a process pool digest_map = {} for path, dirs, files in os.walk(TOPDIR): for name in files: fullname = os.path.join(path,name) digest_map[fullname] = p.apply_async( compute_digest, (fullname,) ) # Go through the final dictionary and collect results for filename, result in digest_map.items(): digest_map[filename] = result.get() • This runs in about 5.6 seconds
kinds of applications, programmers have turned to alternative approaches that don't rely on threads or processes • Primarily this centers around asynchronous I/O and I/O multiplexing • You try to make a single Python process run as fast as possible without any thread/process overhead (e.g., context switching, stack space, and so forth) 152 p = Process(target=somefunc)
asyncore library module • Implements a wrapper around sockets that turn all blocking I/O operations into events 154 p = Process(target=somefunc) s = socket(...) s.accept() s.connect(addr) s.recv(maxbytes) s.send(msg) ... from asyncore import dispatcher class MyApp(dispatcher): def handle_accept(self): ... def handle_connect(self): ... def handle_read(self): ... def handle_write(self): ... # Create a socket and wrap it s = MyApp(socket())
asyncore is one of the ugliest, most annoying, mind-boggling modules in the entire Python library • Combines all of the "fun" of network programming with the "elegance" of GUI programming (sic) • However, if you use this module, you can technically create programs that have "concurrency" without any threads/processes 156 p = Process(target=somefunc)
a generator function hits the yield statement, it suspends execution 159 p = Process(target=somefunc) def countdown(n): while n > 0: yield n n -= 1 • Here's the idea : Instead of yielding a value, a generator can yield control • You can write a little scheduler that cycles between generators, running each one until it explicitly yields
It is also possible to tie coroutines to I/O • You take an event loop (like asyncore), but instead of firing callback functions, you schedule coroutines in response to I/O activity 163 p = Process(target=somefunc) Scheduler loop socket socket socket socket coroutine select()/poll() next() • Unfortunately, this requires its own tutorial...
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4