15. Threading

15.1. Process

The Process object will create a new process to invoke the method associated with it. The two important arguments to Process is the target, which is typically a function, and args which is a tuple of the arguments that we want to pass to the function. The start() function of a Process object will commence the computation and the join() function will wait until the processing is finished. Below, we generate a list of 1,000 random numbers in the range [0, 100]. What we want to do with each number is to square it. Note the function squared() does not return anything; it simply computes the result of the integer passed to it squared. Also, inside squared(), we get the parent and process IDs associated with running the method to show that these are indeed different processes that are being created.

Note

If you look at the code below, you will see that we use the main guard which is if __name__ == '__main__':. We perform multiprocessing inside the main guard to prevent problems associated with the ways processes are created (endless loops of process generations). Python recommends that multiprocessing code be placed inside the main guard.

 1from multiprocessing import Process
 2import random
 3import os
 4from random import randint
 5
 6
 7def squared(x):
 8   module_name = __name__
 9   parent_id = os.getppid()
10   process_id = os.getpid()
11
12   result = x * x
13
14   print(f'module={module_name} : parent={parent_id} : process={process_id} : result={result}')
15
16
17if __name__ == '__main__':
18   # generate 1,000 random numbers in the range [0, 100]
19   numbers = [random.randint(0, 100) for _ in range(1000)]
20
21   # create a list of Process
22   processes = [Process(target=squared, args=(num,)) for num in numbers]
23
24   # start each Process
25   for process in processes:
26      process.start()
27
28   # join each Process
29   for process in processes:
30      process.join()

So, squared() did not return anything, but, what we wanted to get the results? How do we get any output from squared()? The idea here is to use a Queue. The pattern of multiprocessing with Process is now, create, start, get and join.

 1from multiprocessing import Process, Queue
 2import random
 3import os
 4from random import randint
 5
 6
 7def squared(x, queue):
 8   module_name = __name__
 9   parent_id = os.getppid()
10   process_id = os.getpid()
11
12   result = x * x
13
14   print(f'module={module_name} : parent={parent_id} : process={process_id} : result={result}')
15
16   queue.put(result)
17
18
19if __name__ == '__main__':
20   # generate 1,000 random numbers in the range [0, 100]
21   numbers = [random.randint(0, 100) for _ in range(1000)]
22
23   # set up a queue to store results
24   queue = Queue()
25
26   # create a list of Process
27   processes = [Process(target=squared, args=(num, queue)) for num in numbers]
28
29   # start each Process
30   for process in processes:
31      process.start()
32
33   # get items from the queue to prevent the queue from filling up
34   # if the queue fills up, it will block
35   results = []
36   for process in processes:
37      result = queue.get()
38      results.append(result)
39
40   # join each Process
41   for process in processes:
42      process.join()
43
44   # print results
45   print(results)

15.1.1. Exercise

Write a multiprocessing program to simulate the rolling of dice. There should be a function to simulate rolling 2 dice, and that function should store the result (add the numbers together). At the very end, collect your results from the simulation and count the number of unique values that came up.

Solution.

 1from multiprocessing import Process, Queue
 2from random import randint
 3from itertools import groupby
 4
 5
 6def roll(queue):
 7   die1 = randint(1, 6)
 8   die2 = randint(1, 6)
 9   total = die1 + die2
10
11   queue.put(total)
12
13
14if __name__ == '__main__':
15   # set up a queue to store results
16   queue = Queue()
17
18   # create a list of Process
19   processes = [Process(target=roll, args=(queue, )) for _ in range(100)]
20
21   # start each Process
22   for process in processes:
23      process.start()
24
25   # get items from the queue to prevent the queue from filling up
26   # if the queue fills up, it will block
27   results = []
28   for process in processes:
29      result = queue.get()
30      results.append(result)
31
32   # join each Process
33   for process in processes:
34      process.join()
35
36   # print results
37   get_key = lambda x: x
38   counts = {k: len(list(v)) for k, v in groupby(sorted(results, key=get_key), key=get_key)}
39   print(counts)

15.2. Pool

Pool enables you to use a much simpler idiom for multiprocessing. If the function that you are invoking in the parallelized operations return something, you do not need another structure like Queue to be passed in to store the results.

 1from multiprocessing import Pool
 2import random
 3
 4
 5def squared(x):
 6    return x * x
 7
 8
 9if __name__ == '__main__':
10    numbers = [random.randint(0, 100) for _ in range(1000)]
11    with Pool(5) as pool:
12        results = pool.map(squared, numbers)
13        print(results)

What if the function you are calling requires multiple parameters? Use starmap() instead of map().

 1from multiprocessing import Pool
 2from random import randint
 3
 4def times(x, y, z):
 5   return x * y * z
 6
 7if __name__ == '__main__':
 8   rand = lambda: (randint(0, 10), randint(0, 10), randint(0, 10))
 9   triplets = [rand() for _ in range(1000)]
10   with Pool(5) as pool:
11      results = pool.starmap(times, triplets)
12      print(results)

15.3. Multiprocessing

Here’s another way of using the multiprocessing module to perform parallel processing.

 1import multiprocessing as mp
 2import os
 3import random
 4
 5
 6def squared(x, queue):
 7    module_name = __name__
 8    parent_id = os.getppid()
 9    process_id = os.getpid()
10
11    result = x * x
12
13    print(f'module={module_name} : parent={parent_id} : process={process_id} : result={result}')
14    queue.put(result)
15
16
17if __name__ == '__main__':
18    ctx = mp.get_context('spawn')
19    queue = ctx.Queue()
20
21    processes = [ctx.Process(target=squared, args=(random.randint(0, 100), queue,)) for _ in range(10)]
22
23    for process in processes:
24        process.start()
25
26    for process in processes:
27        process.join()
28
29    while queue.qsize() != 0:
30        print(queue.get())

15.4. Synchronization

We can use Lock to synchronize between different processes. The function acquire() will place a hold on the parallel processes, and release() will allow the parallel processes to continue.

 1from multiprocessing import Process, Lock, Queue
 2import os
 3
 4
 5def squared(x, lock, queue):
 6    module_name = __name__
 7    parent_id = os.getppid()
 8    process_id = os.getpid()
 9
10    result = x * x
11    queue.put(result)
12
13    lock.acquire()
14    try:
15        print(f'module={module_name} : parent={parent_id} : process={process_id} : result={result}')
16    finally:
17        lock.release()
18
19
20if __name__ == '__main__':
21    lock = Lock()
22    queue = Queue(maxsize=-1)
23
24    processes = [Process(target=squared, args=(num, lock, queue, )) for num in range(10)]
25
26    for p in processes:
27        p.start()
28
29    for p in processes:
30        p.join()
31
32    lock.acquire()
33    try:
34        while queue.qsize() != 0:
35            print(queue.get())
36    finally:
37        lock.release()