This post introduces a proposal for a new keyword argument in the __init__()
method of Pool
named expect_initret
. This keyword defaults to False
, and when it is set to True
, the return value of the initializer
function is passed to the function we are mapping over as a kwarg
. I’ve provided two patterns in the reading ahead which illustrate this feature.
Note: There was a similar issue opened years ago, that got some attention, but was ultimately closed due to backwards compatibility issues. I’ve designed this implementation based off the feedback from this issue.
Pattern 1: Initialize Object in Child Process without Global ScopeThis pattern is used to initialize an object after each worker (i.e. subprocess) has been created. Oftentimes the need for this arises when the func
we are applying satisfies one of two cases:
func
is an instance method, and the instance bound to it contains an item that is is not pickle-able. more reading heresockets
, like database connections, that should usually not be serialized/passed to children processes.We will use a SQLAlchemy.Engine
object as our example. Our goal: give each worker process its own engine
object.
The current implementation of Pool
allows for this behavior, however it forces the user to define a global variable in the initializer()
function as follows:
def initializer(db_url):
global sqla_engine
sqla_engine = create_engine(db_url)
def insert_record(record):
sqla_engine.execute(table.insert(record))
records = [...] # Dictionaries of DB records
with Pool(initializer, ("mysql://foo:bar@localhost",)) as pool:
pool.map(insert_record, records)
Note: There are plenty of arguments for/against global variables. There are also arguments for/against variables being made available outside their lexical scope. I intend not to get into these arguments - the goal here is to provide an alternative to the current globals-only solution to initializing
Pool
workers.
Using expect_initret
, the parallelized insertion of records looks as follows:
def initializer(db_url):
return create_engine(db_url)
def insert_record(record, initret: sqlalchemy.engine.Engine=None):
sqla_engine = initret # For readability's sake
sqla_engine.execute(table.insert(record))
records = [...] # Dictionaries of DB records
with Pool(initializer,
("mysql://foo:bar@localhost",),
expect_initret=True) as pool:
pool.map(insert_record, records)
So, we preserve lexical scoping of the sqlalchemy.Engine
object, at the expense of a somewhat ambiguous kwarg named initret
to our mapped function insert_record()
. These becomes a bit more readable with type-hinting.
The idea here is to create a large object ONCE, like a big map or dictionary, in the parent process, and pass that object to each Pool worker. Specifically, the object will be made available in each workers’ local scope as a parameter to our mapped function.
Let’s consider the dummy problem of counting every “on” bit in all integers smaller than 2**16 (i.e. “10101” => 3 “on” bits).
from multiprocessing.pool import Pool
def initializer(int_to_binary_cache: Dict[int, int]) -> None:
global int_to_binary_cache
def count_bits(i: int) -> int:
return int_to_binary_cache[i].count("1")
def parallel_bit_counter(int_ls: List[int]) -> int:
big_int_to_binary_cache = {
i: bin(i) for i in range(2**16 - 1)
}
with Pool(initializer,
initargs=(big_int_to_binary_cache,)) as p:
return sum(
p.imap_unordered(parallel_bit_counter, int_ls))
Note: You can also send data to
Pool
workers with class attributes, which buys a bit more encapsulation.
With expect_initret
, the implementation looks as follows:
from multiprocessing.pool import Pool
def initializer(int_to_binary_cache: Dict[int, int]
) -> Dict[int, int]:
# The identity function
return int_to_binary_cache
def count_bits(i: int, initret: Dict[int, int]) -> int:
return initret[i].count("1")
def parallel_bit_counter(int_ls: List[int]) -> int:
big_int_to_binary_cache = {
i: bin(i) for i in range(2**16 - 1)
}
with Pool(initializer,
initargs=(big_int_to_binary_cache,),
expect_initret=True) as p:
return sum(
p.imap_unordered(count_bits, int_ls))
Yet again, I am the first to admit that the initret
kwarg is somewhat ambiguous. However, the goal is to let Python users choose between the following:
initret
.For those interested, the path to getting stuck deep, deep in the cavernous rabbit hole of Python’s multiprocessing.Pool
is as follows:
If you were to take every library written in Python, and…
global
from within a function via the global
keyword…the count of (2) would be overwhelmingly higher than (1).
Given that Python users (like me) are more familiar with functions that do not “create global variables as their side-effect”, it is my hope that this API extension, and the examples above, will enable more Python users to use the Pool
interface, while preserving every bit of the beautifully abstracted multiprocessing.Pool
module.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4