Here is a little module I wrote for the needs of a program called DISWEN (DIStributed Web Exploration Network) that is not available yet, cause I need time to work on it.
Thanks to my good friend Sylvestre Ledru for writing the priority as well as the verbosity parts.
Many thanks to Chris Cowell-Shah for allowing me to use and publish on this website the benchmark function longArithmetic(longMin, longMax) found here.
Source code :
The ThreadPool object is designed to run several tasks at the same time. It manages a queue of tasks and executes a given number of tasks concurrently. Each time a task, known as ThreadPoolTask, ends (ie. a thread is freed), if there is at least one task in the queue, a new task is processed.
The diagram on the right shows a 3 concurrent tasks capacity ThreadPool which is actually managing 5 tasks. There are 3 tasks being processed at the same time, while 2 are stored in the queue, waiting to be run. The tasks from the queue are sorted given their priorities, so that a higher priority task is processed first.
In order to show the Threadpool at work, I added to the sample code a benchmark function, longArithmetic(longMin, longMax), I found on the Chris Cowell-Shah's Personnal Website. My purpose is not to explain this function here, but to use it as a time consuming function.
In order to be able to add a task into the ThreadPool, you will have to inherit the ThreadPoolTask class and implement the _action() method as I did in the following example :
class MyLongTask(ThreadPoolTask): def __init__(self, intMax): ThreadPoolTask.__init__(self) self.intMax = intMax def _action(self): print "Starting MyLongTask : intMax = ", self.intMax self.intArithmetic() print "MyLongTask Ended : intMax = ", self.intMax (...)
Here I decided to invoke the self.intArithmetic() method, but what you do here is your business :)
It is necessary to configure the size of the pool (ie. number of concurrent threads) with a parameter given to the constructor :
tp = ThreadPool(2) # creates a ThreadPool with 2 concurrent tasks
Once created, a task may be inserted into the pool through to method def addTask(self, task, priority = 10).
mytask1 = MyLongTask(intMax/2) # create a Task object inheriting from ThreadPoolTask (...) tp.addTask(mytask1, 1) # adds task 1 with priority 1 (lowest) tp.addTask(mytask2) # adds task 2 with no priority (default 10)
Note that it is not necessary to give a priority to the task, cause a default priority is defined to 10.
The ThreadPool may be run with or without any tasks in the queue. It won't stop until the method terminate() is called and until all tasks are not ended. Anyway, in order to start the computation, you will need to invoke the start().
tp.start() # starts the processing
if __name__ == "__main__":
intMax = 500000
# create a Task object inheriting from ThreadPoolTask
mytask1 = MyLongTask(intMax/2)
mytask2 = MyLongTask(intMax)
mytask3 = MyLongTask(intMax*1.5)
mytask4 = MyLongTask(intMax/2+43)
mytask5 = MyLongTask(intMax-1)
# creates a ThreadPool with 2 concurrent tasks
tp = ThreadPool(2)
tp.addTask(mytask2, 2) # adds task 2 with priority 2
tp.addTask(mytask3, 7)
tp.addTask(mytask1, 1)
tp.addTask(mytask4, 9)
tp.addTask(mytask5, 6)
tp.start() # starts the processing
time.sleep(2) # 2 seconds pause
mytask6 = MyLongTask(300000)
mytask7 = MyLongTask(intMax-1)
tp.addTask(mytask6, 4)
tp.addTask(mytask7, 20)
# signals the ThreadPool to terminate when each task has been processed
tp.terminate()
print "Main program ended"

| Platform | OS | Python Version |
|---|---|---|
| i386 | Debian Sarge | 2.3 |
| PowerBook G4 | MaxOSX Panther | 2.2 |