Julien Herbin
Personnal page

The Python ThreadPool

Abstract

Here is a little module I wrote for the needs of a program called DISWEN (DIStributed Web Exploration Network) that is not available yet, cause I need time to work on it.

Thanks to my good friend Sylvestre Ledru for writing the priority as well as the verbosity parts.
Many thanks to Chris Cowell-Shah for allowing me to use and publish on this website the benchmark function longArithmetic(longMin, longMax) found here.


python Source code :
The source code of the program comes under the GPL Licence. If you don't known this licence, you should read the following page : http://www.gnu.org/licenses/gpl.html.
To put it in a nutshell, you can use this code freely, but if you add some features, I would be grateful if you let me know about it.

So, what is it all about ?

General Diagram The ThreadPool object is designed to run several tasks at the same time. It manages a queue of tasks and executes a given number of tasks concurrently. Each time a task, known as ThreadPoolTask, ends (ie. a thread is freed), if there is at least one task in the queue, a new task is processed.

The diagram on the right shows a 3 concurrent tasks capacity ThreadPool which is actually managing 5 tasks. There are 3 tasks being processed at the same time, while 2 are stored in the queue, waiting to be run. The tasks from the queue are sorted given their priorities, so that a higher priority task is processed first.

Usage

Long calculcation function

In order to show the Threadpool at work, I added to the sample code a benchmark function, longArithmetic(longMin, longMax), I found on the Chris Cowell-Shah's Personnal Website. My purpose is not to explain this function here, but to use it as a time consuming function.

Do your own ThreadPoolTask

In order to be able to add a task into the ThreadPool, you will have to inherit the ThreadPoolTask class and implement the _action() method as I did in the following example :

class MyLongTask(ThreadPoolTask):
	def __init__(self, intMax):
		ThreadPoolTask.__init__(self)
		self.intMax = intMax

	def _action(self):
		print "Starting MyLongTask : intMax = ", self.intMax
		self.intArithmetic()
		print "MyLongTask Ended : intMax = ", self.intMax
(...)

Here I decided to invoke the self.intArithmetic() method, but what you do here is your business :)

Configure the Pool

It is necessary to configure the size of the pool (ie. number of concurrent threads) with a parameter given to the constructor :

tp = ThreadPool(2) # creates a ThreadPool with 2 concurrent tasks

Add your task to the Pool

Once created, a task may be inserted into the pool through to method def addTask(self, task, priority = 10).

mytask1 = MyLongTask(intMax/2) # create a Task object inheriting from ThreadPoolTask
(...)
tp.addTask(mytask1, 1) # adds task 1 with priority 1 (lowest)
tp.addTask(mytask2) # adds task 2 with no priority (default 10)

Note that it is not necessary to give a priority to the task, cause a default priority is defined to 10.

Run the ThreadPool

The ThreadPool may be run with or without any tasks in the queue. It won't stop until the method terminate() is called and until all tasks are not ended. Anyway, in order to start the computation, you will need to invoke the start().

tp.start() # starts the processing

Sample usage code

The sample code below will create 5 tasks with different priorities and add them to the Pool. Then the ThreadPool is started and starts the processing. The main program waits 2 seconds to add 2 more tasks and then will signal the ThreadPool to terminate when there is no more task to perform :
if __name__ == "__main__":
	intMax = 500000

# create a Task object inheriting from ThreadPoolTask
	mytask1 = MyLongTask(intMax/2)
	mytask2 = MyLongTask(intMax)
	mytask3 = MyLongTask(intMax*1.5)
	mytask4 = MyLongTask(intMax/2+43)
	mytask5 = MyLongTask(intMax-1)
    
# creates a ThreadPool with 2 concurrent tasks
	tp = ThreadPool(2) 
	tp.addTask(mytask2, 2) # adds task 2 with priority 2
	tp.addTask(mytask3, 7)
	tp.addTask(mytask1, 1)
	tp.addTask(mytask4, 9)
	tp.addTask(mytask5, 6)
	tp.start() # starts the processing
	time.sleep(2) # 2 seconds pause
	mytask6 = MyLongTask(300000)
	mytask7 = MyLongTask(intMax-1)
	tp.addTask(mytask6, 4)
	tp.addTask(mytask7, 20)

# signals the ThreadPool to terminate when each task has been processed
	tp.terminate()
	print "Main program ended"

Deeper into the ThreadPool

UML class diagram

UML class diagram

Tested platforms

As Python is a portable script language, the script should work on any computer running a python virtual machine. It doesn't require any other module.
Platform OS Python Version
i386 Debian Sarge 2.3
PowerBook G4 MaxOSX Panther 2.2