Introduction to multitasking with Python #002 multiprocessing (2020 tutorial)

 

1. What are processes?

When you write a program and save it on your computer, it is just a file that cannot use resources, such as network, camera, keyboads etc. But when you lauch the program and have it running, a process is created and it can use computer resources such as memory, carmera, keyboard etc. 

When you start your operating system, there are a lot of processes will be created by the OS automatically. On windows you can open the task manager to see all the processes running.

On windows, under the tab of Processes, you could see all the processes and also you could kill the running process if you have the permission to do so.

On Ubuntu you could use command: ps -aux to list all the processes.

2. Introduction to multiprocessing module

In the last topic, we have touched the topic on multithreading. Multithreading is a way to achieve multitasking for a program and multiprocessing is another way to achieve that end. Its usage is very similar to that of threading module.

Let's see an example:

# multiprocessing_example01.py
import multiprocessing, time

def process_1():
      for i in range(5):
            print('----Process1----')
            time.sleep(1)

def process_2():
      for i in range(5):
            print('----Process2----')
            time.sleep(1)

def main():
      # create two Process objects
      p1 = multiprocessing.Process(target=process_1)
      p2 = multiprocessing.Process(target=process_2)

      # start the processes
      p1.start()
      p2.start()

      print('----main process----')
      

if __name__ == '__main__':
      main()

import multiprocessing, time

First, we import the multiprocessing module.

# create two Process objects
p1 = multiprocessing.Process(target=process_1)
p2 = multiprocessing.Process(target=process_2)

Next, we create two Process objects with target arguments just like we could do with the threading.Thread() constructor.

p1.start()
p2.start()

Then start the two processes. After running the program, we have the output:


It does the same thing that thread do. But what is the difference between multithread and multiprocessing.

The main difference between two of them is the usage of resources. Let's see a few pictures.

The red arrow represent the main process, it goes from statement 1 to 7. When it goes to statement 5, the main process starts a new process.

When a new process is started, the resources used by the main process will be copied into the new process. The green arrow represent the new process and the new process will have a copy of all the resouces of the main process.

Likewise, when process 2 is created, another copy of resources of the main process will also be attached to the process 2.

Multiprocessing uses a lot of resources while threads use shared resouces across multiple threads.

BTW, in a single process we could have multiple threads. For instance, you have lauched two music players which represent two processes, in each process you could launch two child threads, one for music playback and another for downloading.

3. Communication between two processes

As we know, processes have their own copy of resources and the resources cannot be shared among processes. How come we communicate among processes so that while one process is downloading data, another process is processing the same data.

In the multiprocessing module, we have a Queue object that could help us to do that.  3.1 What is Queue? 

A queue is just like the queue in the picture when you want to get a hamburg at lunch time. The first person will get the burg first then the second, the third... first in first out. While another data structure is called stack where last in first out.

Let's see a simple example:

>>> from multiprocessing import Queue
>>> q = Queue(3)
>>> q.put(11)
>>> q.put(12)
>>> q.put(13)
>>> q.get()
11
>>> q.get()
12
>>> q.get()
13
>>> q.get()

We create a Queue object which can only hold 3 elements. Then we use put() method to put element in the queue and use get() method to get the element following the order 'first in first out' .

In stead of using put() and get(), we could use put_nowait() and get_nowait(). In the above example, the queue could only hold 3 elements, if we put the fourth element in the queue, it will block until there is enough space to hold it in the queue. Likewise if there is no more elements to get from the queue, it will also block until there is a element to get from the queue.

>>> q = Queue(3)
>>> q.put(11)
>>> q.put(22)
>>> q.put(33)
>>> q.put(44) # program will block here

>>> q.get()
>>> q.get()
>>> q.get()
>>> q.get() # program will block here

But if you use put_nowait() and get_nowait(), it will raise exceptions to indicate the queue is full or empty.

>>> q = Queue(3)
>>> q.put_nowait(11)
>>> q.put_nowait(22)
>>> q.put_nowait(33)
>>> q.put_nowait(44)
Traceback (most recent call last):
  File "<pyshell#5>", line 1, in <module>
    q.put_nowait(44)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\queues.py", line 132, in put_nowait
    return self.put(obj, False)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\queues.py", line 84, in put
    raise Full
queue.Full


>>> from multiprocessing import Queue
>>> q = Queue(3)
>>> q.put_nowait(11)
>>> q.put_nowait(22)
>>> q.put_nowait(33)
>>> q.get_nowait()
11
>>> q.get_nowait()
22
>>> q.get_nowait()
33
>>> q.get_nowait()
Traceback (most recent call last):
  File "<pyshell#8>", line 1, in <module>
    q.get_nowait()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\queues.py", line 129, in get_nowait
    return self.get(False)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38-32\lib\multiprocessing\queues.py", line 110, in get
    raise Empty
_queue.Empty
>>> 

We could also use q.empty() and q.full() to check if the queue is empty or full.

3.2 Use Queue to communicate among processes

Let's see another simple example:

# multiprocessing_example02.py
import multiprocessing

def download_data(q):
      data = [11,22,33,44, None] # None represent the end of the data
      for e in data:
            print('downloading: ', e)
            q.put(e)
      
      print('Data downloaded!')
      
def process_data(q):
      processed_data = list()
      # constantly processing data from the queue
      while True:
            data = q.get()
            if data == None:
                  break
            print('processing data: ', data)
            processed_data.append(data)
      print('Data processing down!')
      print(processed_data)
                        
def main():
      # create a Queue object
      q = multiprocessing.Queue()
      
      # create two Process objects
      p1 = multiprocessing.Process(target=download_data, args=(q,))
      p2 = multiprocessing.Process(target=process_data, args=(q,))

      # start the processes
      p1.start()
      p2.start()
      
if __name__ == '__main__':
      main()

We create two processes, one process is for downloading data which in this case we use a list to represent the downloaded data and another process for processing data. 

data = [11,22,33,44, None]

We put a None in the end of the list to indicate the end of the downloaded data. After running this program, we could see the output as below:


Now we have achieved our goal of communicating among processes.

Besides using Queue to achieve this end, we could also use socket which could achieve communication among processes on the same device as well as across the network.

4. Process Pool

Next, we are gonna talk about the "Process Pool". It is an important concept and it is easy to use.

Just imagine, we must create a program that copy every file in a folder to another place. We don't know how many files inside the folder, maybe 100 or 10000. To copy such numer of files, we could use a traditional way that copy each file one by one without using the multitasking technique. In reality, this approach is time consuming. So we could use multiprocessing learned in this tutorial.

Here comes another issue, for 100 files, we could create 100 processes to copy 100 files at the same time. But what if there are 10000 files, in reality we could not create 10000 processes in a modern system. As you may have already known, processes uses a lot of resouces and the process of creating and killing a process is also a burden to our system. Then here comes the Pool that help us to balance between the number of processes created and efficiency.



We build a pool which could only contain 10 processes, but there are 100 tasks that need to be processed; the pool will only process 10 tasks at a time; when there is an available process in the pool, it continues to process the 11th task, the same process goes on until all 100 tasks are done.

Task 1 is done, process 1 is ready for processing task 11.


Task 2 is done, process 2 is ready for processing task12.

Let's see a python example:

# multiprocessing_example02.py
from multiprocessing import Pool, freeze_support
import time, random

def copy_file_demo(n):
      print(f'file{n} is copying...')
      time.sleep(random.random()*2) # simulate a real process
      print(f'file{n} copied')

def main():
      for i in range(100):
            pool.apply_async(copy_file_demo, (i,))

      # close the pool
      # after the pool closed, it could not accept new tasks
      pool.close()
      # wait for child processes' finish
      # if there is no join invokation, the main process will terminate
      # all child processes will be terminated automatically
      pool.join()
      
      
if __name__ == '__main__':
      pool = Pool(5) # create a Pool object which could only create 5 processes
      freeze_support()
      main()

pool = Pool(5)

We create a Pool object that could only hold 5 worker processes. Then we need to copy 100 files with only 5 processes in the pool.

pool.apply_async(copy_file_demo, (i,))

We use apply_async() to add tasks to the pool and then the job is left to the Poll and it will allocate processes to tasks.

pool.close()
pool.join()

When we close the pool, it will no longer accept new tasks. The join() method must be called after close() which blocks the main process and wait until all child processes finish. Here is the output:


5. A file copier with multiprocessing

# file_copier_multiprocessing
import os
from multiprocessing import Pool

# copy file function
def copy_file(file_name, old_dir, new_dir):
      print(f'copying {os.path.join(old_dir, file_name)} to {os.path.join(new_dir, file_name)}')
      # open file to read content
      with open(os.path.join(old_dir, file_name), 'rb') as f:
            content = f.read()

      # write content to new file
      with open(os.path.join(new_dir, file_name), 'wb') as ff:
            ff.write(content)
      
      print(f'{os.path.join(new_dir, file_name)} finished copying')

def main():
      # create a Pool object
      pool = Pool(5)

      # user enter directory name
      dir_name = input('Enter directory name: ')

      # create a new directory 
      new_dir = 'new_' + dir_name
      try:
            os.mkdir(new_dir)
      except:
            pass
      
      # get all the file names from the directory
      file_names = os.listdir(dir_name)

      # copy all files using pool
      for file_name in file_names:
            pool.apply_async(copy_file, (file_name, dir_name, new_dir))
      
      # close pool
      pool.close()
      # join
      pool.join()

if __name__ == "__main__":
      main()

pool = Pool(5)

We create a Pool object which could only create 5 processes in the pool.

 # user enter directory name
dir_name = input('Enter directory name: ')

# create a new directory 
new_dir = 'new_' + dir_name

User ener the directory name and  made a new diretory out of this.

try:
    os.mkdir(new_dir)
except:
    pass

Then we use os.mkdir() to make a new directory. If directory already existed, we just pass.

file_names = os.listdir(dir_name)

Next, os.listdir() returns all the file names in the directory. Next we iterate through it.

for file_name in file_names:
     pool.apply_async(copy_file, (file_name, dir_name, new_dir))

We add each copying task to the pool and let the pool handle the processing.

# close pool
pool.close()
# join
pool.join()

As always, we close the pool so that it no longer accept new tasks and invoke join() so that the main process will wait for child processes.

def copy_file(file_name, old_dir, new_dir):
      print(f'copying {os.path.join(old_dir, file_name)} to {os.path.join(new_dir, file_name)}')
      # open file to read content
      with open(os.path.join(old_dir, file_name), 'rb') as f:
            content = f.read()

      # write content to new file
      with open(os.path.join(new_dir, file_name), 'wb') as ff:
            ff.write(content)
      
      print(f'{os.path.join(new_dir, file_name)} finished copying')

The copying function is done by copy_file function. It just open the file and create a new and write the file content to the new file.

Well, just run this application to test it. I have created a folder named 'test' in the same folder as the program for simplicity. And copied some .py files from the python lib directory. Here is my output:















Comments

Popular posts from this blog

How to write a slide puzzle game with Python and Pygame (2020 tutorial)

How to create a memory puzzle game with Python and Pygame (#005)

Introduction to multitasking with Python #001 multithreading (2020 tutorial)