15. Threading
15.1. Process
The Process
object will create a new process to invoke the method associated with it. The two important arguments to Process
is the target, which is typically a function, and args which is a tuple of the arguments that we want to pass to the function. The start()
function of a Process
object will commence the computation and the join()
function will wait until the processing is finished. Below, we generate a list of 1,000 random numbers in the range [0, 100]. What we want to do with each number is to square it. Note the function squared()
does not return anything; it simply computes the result of the integer passed to it squared. Also, inside squared()
, we get the parent and process IDs associated with running the method to show that these are indeed different processes that are being created.
Note
If you look at the code below, you will see that we use the main guard
which is if __name__ == '__main__':
. We perform multiprocessing inside the main guard to prevent problems associated with the ways processes are created (endless loops of process generations). Python recommends that multiprocessing code be placed inside the main guard.
1from multiprocessing import Process
2import random
3import os
4from random import randint
5
6
7def squared(x):
8 module_name = __name__
9 parent_id = os.getppid()
10 process_id = os.getpid()
11
12 result = x * x
13
14 print(f'module={module_name} : parent={parent_id} : process={process_id} : result={result}')
15
16
17if __name__ == '__main__':
18 # generate 1,000 random numbers in the range [0, 100]
19 numbers = [random.randint(0, 100) for _ in range(1000)]
20
21 # create a list of Process
22 processes = [Process(target=squared, args=(num,)) for num in numbers]
23
24 # start each Process
25 for process in processes:
26 process.start()
27
28 # join each Process
29 for process in processes:
30 process.join()
So, squared()
did not return anything, but, what we wanted to get the results? How do we get any output from squared()
? The idea here is to use a Queue
. The pattern of multiprocessing with Process
is now, create, start, get and join.
1from multiprocessing import Process, Queue
2import random
3import os
4from random import randint
5
6
7def squared(x, queue):
8 module_name = __name__
9 parent_id = os.getppid()
10 process_id = os.getpid()
11
12 result = x * x
13
14 print(f'module={module_name} : parent={parent_id} : process={process_id} : result={result}')
15
16 queue.put(result)
17
18
19if __name__ == '__main__':
20 # generate 1,000 random numbers in the range [0, 100]
21 numbers = [random.randint(0, 100) for _ in range(1000)]
22
23 # set up a queue to store results
24 queue = Queue()
25
26 # create a list of Process
27 processes = [Process(target=squared, args=(num, queue)) for num in numbers]
28
29 # start each Process
30 for process in processes:
31 process.start()
32
33 # get items from the queue to prevent the queue from filling up
34 # if the queue fills up, it will block
35 results = []
36 for process in processes:
37 result = queue.get()
38 results.append(result)
39
40 # join each Process
41 for process in processes:
42 process.join()
43
44 # print results
45 print(results)
15.1.1. Exercise
Write a multiprocessing program to simulate the rolling of dice. There should be a function to simulate rolling 2 dice, and that function should store the result (add the numbers together). At the very end, collect your results from the simulation and count the number of unique values that came up.
Solution.
1from multiprocessing import Process, Queue
2from random import randint
3from itertools import groupby
4
5
6def roll(queue):
7 die1 = randint(1, 6)
8 die2 = randint(1, 6)
9 total = die1 + die2
10
11 queue.put(total)
12
13
14if __name__ == '__main__':
15 # set up a queue to store results
16 queue = Queue()
17
18 # create a list of Process
19 processes = [Process(target=roll, args=(queue, )) for _ in range(100)]
20
21 # start each Process
22 for process in processes:
23 process.start()
24
25 # get items from the queue to prevent the queue from filling up
26 # if the queue fills up, it will block
27 results = []
28 for process in processes:
29 result = queue.get()
30 results.append(result)
31
32 # join each Process
33 for process in processes:
34 process.join()
35
36 # print results
37 get_key = lambda x: x
38 counts = {k: len(list(v)) for k, v in groupby(sorted(results, key=get_key), key=get_key)}
39 print(counts)
15.2. Pool
Pool
enables you to use a much simpler idiom for multiprocessing. If the function that you are invoking in the parallelized operations return something, you do not need another structure like Queue
to be passed in to store the results.
1from multiprocessing import Pool
2import random
3
4
5def squared(x):
6 return x * x
7
8
9if __name__ == '__main__':
10 numbers = [random.randint(0, 100) for _ in range(1000)]
11 with Pool(5) as pool:
12 results = pool.map(squared, numbers)
13 print(results)
What if the function you are calling requires multiple parameters? Use starmap()
instead of map()
.
1from multiprocessing import Pool
2from random import randint
3
4def times(x, y, z):
5 return x * y * z
6
7if __name__ == '__main__':
8 rand = lambda: (randint(0, 10), randint(0, 10), randint(0, 10))
9 triplets = [rand() for _ in range(1000)]
10 with Pool(5) as pool:
11 results = pool.starmap(times, triplets)
12 print(results)
15.3. Multiprocessing
Here’s another way of using the multiprocessing module to perform parallel processing.
1import multiprocessing as mp
2import os
3import random
4
5
6def squared(x, queue):
7 module_name = __name__
8 parent_id = os.getppid()
9 process_id = os.getpid()
10
11 result = x * x
12
13 print(f'module={module_name} : parent={parent_id} : process={process_id} : result={result}')
14 queue.put(result)
15
16
17if __name__ == '__main__':
18 ctx = mp.get_context('spawn')
19 queue = ctx.Queue()
20
21 processes = [ctx.Process(target=squared, args=(random.randint(0, 100), queue,)) for _ in range(10)]
22
23 for process in processes:
24 process.start()
25
26 for process in processes:
27 process.join()
28
29 while queue.qsize() != 0:
30 print(queue.get())
15.4. Synchronization
We can use Lock
to synchronize between different processes. The function acquire()
will place a hold on the parallel processes, and release()
will allow the parallel processes to continue.
1from multiprocessing import Process, Lock, Queue
2import os
3
4
5def squared(x, lock, queue):
6 module_name = __name__
7 parent_id = os.getppid()
8 process_id = os.getpid()
9
10 result = x * x
11 queue.put(result)
12
13 lock.acquire()
14 try:
15 print(f'module={module_name} : parent={parent_id} : process={process_id} : result={result}')
16 finally:
17 lock.release()
18
19
20if __name__ == '__main__':
21 lock = Lock()
22 queue = Queue(maxsize=-1)
23
24 processes = [Process(target=squared, args=(num, lock, queue, )) for num in range(10)]
25
26 for p in processes:
27 p.start()
28
29 for p in processes:
30 p.join()
31
32 lock.acquire()
33 try:
34 while queue.qsize() != 0:
35 print(queue.get())
36 finally:
37 lock.release()