A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://speakerdeck.com/dabeaz/an-introduction-to-python-concurrency below:

An Introduction to Python Concurrency

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com This Tutorial 2 •

    Python : An interpreted high-level programming language that has a lot of support for "systems programming" and which integrates well with existing software in other languages. • Concurrency : Doing more than one thing at a time. Of particular interest to programmers writing code for running on big iron, but also of interest for users of multicore PCs. Usually a bad idea--except when it's not.

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com An Overview 4 •

    We're going to explore the state of concurrent programming idioms being used in Python • A look at tradeoffs and limitations • Hopefully provide some clarity • A tour of various parts of the standard library • Goal is to go beyond the user manual and tie everything together into a "bigger picture."

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Disclaimers 5 • The

    primary focus is on Python • This is not a tutorial on how to write concurrent programs or parallel algorithms • No mathematical proofs involving "dining philosophers" or anything like that • I will assume that you have had some prior exposure to topics such as threads, message passing, network programming, etc.

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Programmer Performance • Programmers

    are often able to get complex systems to "work" in much less time using a high-level language like Python than if they're spending all of their time hacking C code. 21 "The best performance improvement is the transition from the nonworking to the working state." - John Ousterhout "You can always optimize it later." - Unknown "Premature optimization is the root of all evil." - Donald Knuth

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Performance is Irrelevant •

    Many concurrent programs are "I/O bound" • They spend virtually all of their time sitting around waiting • Python can "wait" just as fast as C (maybe even faster--although I haven't measured it). • If there's not much processing, who cares if it's being done in an interpreter? (One exception : if you need an extremely rapid response time as in real-time systems) 22

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Commentary • Concurrency is

    usually a really bad option if you're merely trying to make an inefficient Python script run faster • Because its interpreted, you can often make huge gains by focusing on better algorithms or offloading work into C extensions • For example, a C extension might make a script run 20x faster vs. the marginal improvement of parallelizing a slow script to run on a couple of CPU cores 24

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com threading module • Python

    threads are defined by a class import time import threading class CountdownThread(threading.Thread): def __init__(self,count): threading.Thread.__init__(self) self.count = count def run(self): while self.count > 0: print "Counting down", self.count self.count -= 1 time.sleep(5) return • You inherit from Thread and redefine run() 32

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com threading module • Python

    threads are defined by a class import time import threading class CountdownThread(threading.Thread): def __init__(self,count): threading.Thread.__init__(self) self.count = count def run(self): while self.count > 0: print "Counting down", self.count self.count -= 1 time.sleep(5) return • You inherit from Thread and redefine run() 33 This code executes in the thread

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Accessing Shared Data •

    Is this actually a real concern? x = 0 # A shared value def foo(): global x for i in xrange(100000000): x += 1 def bar(): global x for i in xrange(100000000): x -= 1 t1 = threading.Thread(target=foo) t2 = threading.Thread(target=bar) t1.start(); t2.start() t1.join(); t2.join() # Wait for completion print x # Expected result is 0 43 • Yes, the print produces a random nonsensical value each time (e.g., -83412 or 1627732)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Use of Mutex Locks

    • Commonly used to enclose critical sections x = 0 x_lock = threading.Lock() 51 Thread-1 -------- ... x_lock.acquire() x = x + 1 x_lock.release() ... Thread-2 -------- ... x_lock.acquire() x = x - 1 x_lock.release() ... Critical Section • Only one thread can execute in critical section at a time (lock gives exclusive access)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Using a Mutex Lock

    • It is your responsibility to identify and lock all "critical sections" 52 x = 0 x_lock = threading.Lock() Thread-1 -------- ... x_lock.acquire() x = x + 1 x_lock.release() ... Thread-2 -------- ... x = x - 1 ... If you use a lock in one place, but not another, then you're missing the whole point. All modifications to shared state must be enclosed by lock acquire()/release().

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Locks and Deadlock •

    Don't write code that acquires more than one mutex lock at a time 56 x = 0 y = 0 x_lock = threading.Lock() y_lock = threading.Lock() with x_lock: statements using x ... with y_lock: statements using x and y ... • This almost invariably ends up creating a program that mysteriously deadlocks (even more fun to debug than a race condition)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com RLock • Reentrant Mutex

    Lock m = threading.RLock() # Create a lock m.acquire() # Acquire the lock m.release() # Release the lock • Similar to a normal lock except that it can be reacquired multiple times by the same thread • However, each acquire() must have a release() • Common use : Code-based locking (where you're locking function/method execution as opposed to data access) 57

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Semaphores • A counter-based

    synchronization primitive m = threading.Semaphore(n) # Create a semaphore m.acquire() # Acquire m.release() # Release • acquire() - Waits if the count is 0, otherwise decrements the count and continues • release() - Increments the count and signals waiting threads (if any) • Unlike locks, acquire()/release() can be called in any order and by any thread 59

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Events • Event Objects

    e = threading.Event() e.isSet() # Return True if event set e.set() # Set event e.clear() # Clear event e.wait() # Wait for event • This can be used to have one or more threads wait for something to occur • Setting an event will unblock all waiting threads simultaneously (if any) • Common use : barriers, notification 63

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Event Example • Using

    an event to ensure proper initialization init = threading.Event() def worker(): init.wait() # Wait until initialized statements ... def initialize(): statements # Setting up statements # ... ... init.set() # Done initializing Thread(target=worker).start() # Launch workers Thread(target=worker).start() Thread(target=worker).start() initialize() # Initialize 64

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Event Example • Using

    an event to signal "completion" def master(): ... item = create_item() evt = Event() worker.send((item,evt)) ... # Other processing ... ... ... ... ... # Wait for worker evt.wait() 65 Worker Thread item, evt = get_work() processing processing ... ... # Done evt.set() • Might use for asynchronous processing, etc.

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Condition Variables • Condition

    Objects cv = threading.Condition([lock]) cv.acquire() # Acquire the underlying lock cv.release() # Release the underlying lock cv.wait() # Wait for condition cv.notify() # Signal that a condition holds cv.notifyAll() # Signal all threads waiting 66 • A combination of locking/signaling • Lock is used to protect code that establishes some sort of "condition" (e.g., data available) • Signal is used to notify other threads that a "condition" has changed state

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Condition Variables • Common

    Use : Producer/Consumer patterns items = [] items_cv = threading.Condition() 68 item = produce_item() with items_cv: items.append(item) items_cv.notify() with items_cv: while not items: items_cv.wait() x = items.pop(0) # Do something with x ... Producer Thread Consumer Thread • Next you add signaling and waiting • Here, the producer signals the consumer that it put data into the shared list

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Condition Variables • Some

    tricky bits involving wait() 69 with items_cv: while not items: items_cv.wait() x = items.pop(0) # Do something with x ... Consumer Thread • Before waiting, you have to acquire the lock • wait() releases the lock when waiting and reacquires when woken • Conditions are often transient and may not hold by the time wait() returns. So, you must always double-check (hence, the while loop)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Queue Library Module •

    Python has a thread-safe queuing module • Basic operations from Queue import Queue q = Queue([maxsize]) # Create a queue q.put(item) # Put an item on the queue q.get() # Get an item from the queue q.empty() # Check if empty q.full() # Check if full 73 • Usage : You try to strictly adhere to get/put operations. If you do this, you don't need to use other synchronization primitives.

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Queue Signaling • Queues

    also have a signaling mechanism q.task_done() # Signal that work is done q.join() # Wait for all work to be done 75 • Many Python programmers don't know about this (since it's relatively new) • Used to determine when processing is done for item in produce_items(): q.put(item) # Wait for consumer q.join() while True: item = q.get() consume_item(item) q.task_done() Producer Thread Consumer Thread

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com A Performance Test •

    Consider this CPU-bound function def count(n): while n > 0: n -= 1 79 • Sequential Execution: count(100000000) count(100000000) • Threaded execution t1 = Thread(target=count,args=(100000000,)) t1.start() t2 = Thread(target=count,args=(100000000,)) t2.start() • Now, you might expect two threads to run twice as fast on multiple CPU cores

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com The Periodic Check •

    What happens during the periodic check? • In the main thread only, signal handlers will execute if there are any pending signals • Release and reacquisition of the GIL • That last bullet describes how multiple CPU- bound threads get to run (by briefly releasing the GIL, other threads get a chance to run). 88

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com What is a "Tick?"

    • Ticks loosely map to interpreter instructions 89 def countdown(n): while n > 0: print n n -= 1 >>> import dis >>> dis.dis(countdown) 0 SETUP_LOOP 33 (to 36) 3 LOAD_FAST 0 (n) 6 LOAD_CONST 1 (0) 9 COMPARE_OP 4 (>) 12 JUMP_IF_FALSE 19 (to 34) 15 POP_TOP 16 LOAD_FAST 0 (n) 19 PRINT_ITEM 20 PRINT_NEWLINE 21 LOAD_FAST 0 (n) 24 LOAD_CONST 2 (1) 27 INPLACE_SUBTRACT 28 STORE_FAST 0 (n) 31 JUMP_ABSOLUTE 3 ... Tick 1 Tick 2 Tick 3 Tick 4 • Instructions in the Python VM

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Tick Execution • Interpreter

    ticks are not time-based • Ticks don't have consistent execution times 90 • Long operations can block everything >>> nums = xrange(100000000) >>> -1 in nums False >>> 1 tick (~ 6.6 seconds) • Try hitting Ctrl-C (ticks are uninterruptible) >>> nums = xrange(100000000) >>> -1 in nums ^C^C^C (nothing happens, long pause) ... KeyboardInterrupt >>>

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Thread Scheduling • Thread

    switching is far more subtle than most programmers realize (it's tied up in the OS) 93 Thread 1 100 ticks check check check 100 ticks Thread 2 ... Operating System signal signal SUSPENDED Thread Context Switch check • The lag between signaling and scheduling may be significant (depends on the OS) SUSPENDED signal signal check signal

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Multicore GIL Contention •

    With multiple cores, CPU-bound threads get scheduled simultaneously (on different processors) and then have a GIL battle 98 Thread 1 (CPU 1) Thread 2 (CPU 2) Release GIL signal Acquire GIL Wake Acquire GIL (fails) Release GIL Acquire GIL signal Wake Acquire GIL (fails) run run run • The waiting thread (T2) may make 100s of failed GIL acquisitions before any success

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com The GIL and C

    Extensions • Here's some Python test code 103 def churner(n): count = 1000000 while count > 0: churn(n) # C extension function count -= 1 # Sequential execution churner(n) churner(n) # Threaded execution t1 = threading.Thread(target=churner, args=(n,)) t2 = threading.Thread(target=churner, args=(n,)) t1.start() t2.start()

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Why is the GIL

    there? • Simplifies the implementation of the Python interpreter (okay, sort of a lame excuse) • Better suited for reference counting (Python's memory management scheme) • Simplifies the use of C/C++ extensions. Extension functions do not need to worry about thread synchronization • And for now, it's here to stay... (although people continue to try and eliminate it) 105

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com I/O Bound Processing •

    Threads are still useful for I/O-bound apps • For example : A network server that needs to maintain several thousand long-lived TCP connections, but is not doing tons of heavy CPU processing • Here, you're really only limited by the host operating system's ability to manage and schedule a lot of threads • Most systems don't have much of a problem-- even with thousands of threads 108

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com cPickle vs pickle •

    There is an alternative implementation of pickle called cPickle (written in C) • Use it whenever possible--it is much faster 117 import cPickle as pickle ... pickle.dump(someobj,f) • There is some history involved. There are a few things that cPickle can't do, but they are somewhat obscure (so don't worry about it)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com An Example • Launching

    a subprocess and hooking up the child process via a pipe • Use the subprocess module 120 import subprocess p = subprocess.Popen(['python','child.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) p.stdin.write(data) # Send data to subprocess p.stdout.read(size) # Read data from subprocess Python p.stdin p.stdout Python sys.stdin sys.stdout Pipe

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com A Message Channel •

    A class that wraps a pair of files 122 # channel.py import pickle class Channel(object): def __init__(self,out_f,in_f): self.out_f = out_f self.in_f = in_f def send(self,item): pickle.dump(item,self.out_f) self.out_f.flush() def recv(self): return pickle.load(self.in_f) • Send/Receive implemented using pickle

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Some Sample Code •

    A sample child process 123 # child.py import channel import sys ch = channel.Channel(sys.stdout,sys.stdin) while True: item = ch.recv() ch.send(("child",item)) • Parent process setup # parent.py import channel import subprocess p = subprocess.Popen(['python','child.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) ch = channel.Channel(p.stdin,p.stdout)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Some Sample Code •

    Using the child worker 124 >>> ch.send("Hello World") Hello World >>> ch.send(42) 42 >>> ch.send([1,2,3,4]) [1, 2, 3, 4] >>> ch.send({'host':'python.org','port':80}) {'host': 'python.org', 'port': 80} >>> This output is being produced by the child • You can send almost any Python object (numbers, lists, dictionaries, instances, etc.)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com multiprocessing Example • Define

    tasks using a Process class import time import multiprocessing class CountdownProcess(multiprocessing.Process): def __init__(self,count): multiprocessing. Process.__init__(self) self.count = count def run(self): while self.count > 0: print "Counting down", self.count self.count -= 1 time.sleep(5) return • You inherit from Process and redefine run() 130

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Launching Processes • To

    launch, same idea as with threads if __name__ == '__main__': p1 = CountdownProcess(10) # Create the process object p1.start() # Launch the process p2 = CountdownProcess(20) # Create another process p2.start() # Launch • Processes execute until run() stops • A critical detail : Always launch in main as shown (required for Windows) 131

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Does it Work? •

    Consider this CPU-bound function def count(n): while n > 0: n -= 1 133 • Sequential Execution: count(100000000) count(100000000) • Multiprocessing Execution p1 = Process(target=count,args=(100000000,)) p1.start() p2 = Process(target=count,args=(100000000,)) p2.start() 24.6s 12.5s • Yes, it seems to work

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Pipes • A channel

    for sending/receiving objects 136 p = Process(target=somefunc) (c1, c2) = multiprocessing.Pipe() • Returns a pair of connection objects (one for each end-point of the pipe) • Here are methods for communication c.send(obj) # Send an object c.recv() # Receive an object c.send_bytes(buffer) # Send a buffer of bytes c.recv_bytes([max]) # Receive a buffer of bytes c.poll([timeout]) # Check for data

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Pipe Example 138 p

    = Process(target=somefunc) def consumer(p1, p2): p1.close() # Close producer's end (not used) while True: try: item = p2.recv() except EOFError: break print item # Do other useful work here • A simple data consumer • A simple data producer def producer(sequence, output_p): for item in sequence: output_p.send(item)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Pipe Example 139 p

    = Process(target=somefunc) if __name__ == '__main__': p1, p2 = multiprocessing.Pipe() cons = multiprocessing.Process( target=consumer, args=(p1,p2)) cons.start() # Close the input end in the producer p2.close() # Go produce some data sequence = xrange(100) # Replace with useful data producer(sequence, p1) # Close the pipe p1.close()

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Message Queues • multiprocessing

    also provides a queue • The programming interface is the same 140 p = Process(target=somefunc) from multiprocessing import Queue q = Queue() q.put(item) # Put an item on the queue item = q.get() # Get an item from the queue • There is also a joinable Queue from multiprocessing import JoinableQueue q = JoinableQueue() q.task_done() # Signal task completion q.join() # Wait for completion

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Queue Example • A

    consumer process 142 p = Process(target=somefunc) def consumer(input_q): while True: # Get an item from the queue item = input_q.get() # Process item print item # Signal completion input_q.task_done() • A producer process def producer(sequence,output_q): for item in sequence: # Put the item on the queue output_q.put(item)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Queue Example • Running

    the two processes 143 p = Process(target=somefunc) if __name__ == '__main__': from multiprocessing import Process, JoinableQueue q = JoinableQueue() # Launch the consumer process cons_p = Process(target=consumer,args=(q,)) cons_p.daemon = True cons_p.start() # Run the producer function on some data sequence = range(100) # Replace with useful data producer(sequence,q) # Wait for the consumer to finish q.join()

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Commentary • If you

    have written threaded programs that strictly stick to the queuing model, they can probably be ported to multiprocessing • The following restrictions apply • Only objects compatible with pickle can be queued • Tasks can not rely on any shared data other than a reference to the queue 144 p = Process(target=somefunc)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Pool Example • Define

    a function that does some work • Example : Compute a SHA-512 digest of a file 147 p = Process(target=somefunc) import hashlib def compute_digest(filename): digest = hashlib.sha512() f = open(filename,'rb') while True: chunk = f.read(8192) if not chunk: break digest.update(chunk) f.close() return digest.digest() • This is just a normal function (no magic)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Pool Example • Here

    is some code that uses our function • Make a dict mapping filenames to digests 148 p = Process(target=somefunc) import os TOPDIR = "/Users/beazley/Software/Python-3.0" digest_map = {} for path, dirs, files in os.walk(TOPDIR): for name in files: fullname = os.path.join(path,name) digest_map[fullname] = compute_digest(fullname) • Running this takes about 10s on my machine

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Pool Example • With

    a pool, you can farm out work • Here's a small sample 149 p = Process(target=somefunc) p = multiprocessing.Pool(2) # 2 processes result = p.apply_async(compute_digest,('README.txt',)) ... ... various other processing ... digest = result.get() # Get the result • This executes a function in a worker process and retrieves the result at a later time • The worker churns in the background allowing the main program to do other things

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Pool Example • Make

    a dictionary mapping names to digests 150 p = Process(target=somefunc) import multiprocessing import os TOPDIR = "/Users/beazley/Software/Python-3.0" p = multiprocessing.Pool(2) # Make a process pool digest_map = {} for path, dirs, files in os.walk(TOPDIR): for name in files: fullname = os.path.join(path,name) digest_map[fullname] = p.apply_async( compute_digest, (fullname,) ) # Go through the final dictionary and collect results for filename, result in digest_map.items(): digest_map[filename] = result.get() • This runs in about 5.6 seconds

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Alternatives • In certain

    kinds of applications, programmers have turned to alternative approaches that don't rely on threads or processes • Primarily this centers around asynchronous I/O and I/O multiplexing • You try to make a single Python process run as fast as possible without any thread/process overhead (e.g., context switching, stack space, and so forth) 152 p = Process(target=somefunc)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Events and Asyncore •

    asyncore library module • Implements a wrapper around sockets that turn all blocking I/O operations into events 154 p = Process(target=somefunc) s = socket(...) s.accept() s.connect(addr) s.recv(maxbytes) s.send(msg) ... from asyncore import dispatcher class MyApp(dispatcher): def handle_accept(self): ... def handle_connect(self): ... def handle_read(self): ... def handle_write(self): ... # Create a socket and wrap it s = MyApp(socket())

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Asyncore Commentary • Frankly,

    asyncore is one of the ugliest, most annoying, mind-boggling modules in the entire Python library • Combines all of the "fun" of network programming with the "elegance" of GUI programming (sic) • However, if you use this module, you can technically create programs that have "concurrency" without any threads/processes 156 p = Process(target=somefunc)

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com An Insight • Whenever

    a generator function hits the yield statement, it suspends execution 159 p = Process(target=somefunc) def countdown(n): while n > 0: yield n n -= 1 • Here's the idea : Instead of yielding a value, a generator can yield control • You can write a little scheduler that cycles between generators, running each one until it explicitly yields

  • Copyright (C) 2009, David Beazley, http://www.dabeaz.com Coroutines and I/O •

    It is also possible to tie coroutines to I/O • You take an event loop (like asyncore), but instead of firing callback functions, you schedule coroutines in response to I/O activity 163 p = Process(target=somefunc) Scheduler loop socket socket socket socket coroutine select()/poll() next() • Unfortunately, this requires its own tutorial...


  • RetroSearch is an open source project built by @garambo | Open a GitHub Issue

    Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

    HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4