Hey guys,

Today I’d like to tell you how to use multiple threads for heavy computing purposes.

For example, we (at Immowelt) needed Multithreading to speed up our static code analysis. The old process, based on bash, took up to 30 seconds each time the pre-commit Githook triggered. As soon as we used Multithreading the process executed within 5 seconds.

At first, we need to figure which packages we need to import. To use the most simple kind of multithreading we only need the ThreadPool Module which is part of multiprocessing.pool. I also need some other modules for demonstration purposes.

Import Modules; Start execution timer
from multiprocessing.pool import ThreadPool
import random, time, timeit

start_time = timeit.default_timer()

Then we define all needed variables and also the heavy calculating ( 😉 ) function.

Heavy Computing Function
def asyncProcess(threadNumber, threadRuntime):
    time.sleep(threadRuntime)
    return("%i: this Process ran %i seconds" % (threadNumber, threadRuntime))

Everything is ready to go, so we start the main function.

Main Function
if __name__ == '__main__':
    thread_count = 10 # Define the limit of concurrent running threads
    thread_pool = ThreadPool(processes=thread_count) # Define the thread pool to keep track of the sub processes
    known_threads = {}

    # Now we execute 10 parallel threads
    for i in range(0,10):
        known_threads[i] = thread_pool.apply_async(asyncProcess, args=(i,random.randint(0,10),))

    thread_pool.close() # After all threads started we close the pool
    thread_pool.join() # And wait until all threads are done

    # Getting the results of all threads
    for thread in known_threads:
        print known_threads[thread].get()

    print("Parent: this Process ran %s seconds" % str(timeit.default_timer() - start_time))

The Main Function cover the management of the Thread Pool, start all threads, wait until every child is done and gather the information of the child threads.

  • Thread Management (Line 2-4)
    • The limit of concurrent threads must be set. Be careful if you produce a heavy amount of network traffic or IOPS.
    • To spawn any child processes the thread pool must be defined
    • If you don’t have any fire-and-forget processes all children must be known
  • Start all threads (Line 7-8)
    • Here we start 10 Processes which wait 10 seconds and return the needed information
    • All children will be mapped in a dict object to gain their information after the thread pool is done
    • Very important: at line eight (the marked one) the last argument of args= must end with a comma-sign.
  • Wait until every child is done (Line 10-11)
    • The <pool>.close() method tells the pool object to stop accepting new child’s.
    • <pool>.join() blocks the parent until all child’s are done.
  • Gather computed information of the child threads (Line 14-15)
    • We iterate through the dict object and use the <child_thread>.get() method to print the return value generated by the children.

 

Putting it all together:

Whole Code
# Importing all needed modules
from multiprocessing.pool import ThreadPool
import random, time, timeit

# Starting timer for Parent measurement
start_time = timeit.default_timer()

# Define the function which will be executed within the ThreadPool
def asyncProcess(threadNumber, threadRuntime):
    time.sleep(threadRuntime)
    return("%i: this Process ran %i seconds" % (threadNumber, threadRuntime))

if __name__ == '__main__':
    thread_count = 10 # Define the limit of concurrent running threads
    thread_pool = ThreadPool(processes=thread_count) # Define the thread pool to keep track of the sub processes
    known_threads = {}

    # Now we execute 10 parallel threads
    for i in range(0,10):
        known_threads[i] = thread_pool.apply_async(asyncProcess, args=(i,random.randint(0,10),))

    thread_pool.close() # After all threads started we close the pool
    thread_pool.join() # And wait until all threads are done

    # Getting the results of all threads
    for thread in known_threads:
        print known_threads[thread].get()

    print("Parent: this Process ran %s seconds" % str(timeit.default_timer() - start_time))

Example Output

#: python multithreading.py
0: this Process ran 9 seconds
1: this Process ran 3 seconds
2: this Process ran 2 seconds
3: this Process ran 1 seconds
4: this Process ran 5 seconds
5: this Process ran 5 seconds
6: this Process ran 1 seconds
7: this Process ran 10 seconds
8: this Process ran 2 seconds
9: this Process ran 1 seconds
Parent: this Process ran 10.0648648739 seconds


Marvyn Zalewski

Marvyn Zalewski

Marvyn is a nerdy guy which is into Linux and everything connected to it. He also loves to automate his home and build up a home lab which includes e.G. a custom steam machine and backup automation. He loves to hear EDM music and try to become a gin enthusiast.

Leave a Reply

Your email address will not be published.

seventeen − nine =