Java FixedThreadPool

January 20th, 2010 | by bryan |

When creating the code to do the scanning of content for potential thumbnail images I needed to work with the FixedThreadPool to get the level of performance that I wanted.  There are large amounts of code within ReadPath that extend a class Scanner. This class creates a Thread and makes it simple to do chunks of work with checkpoints and wait times. For each block of time a process method is called. If the work takes less than the timeout to complete then the thread waits till the timeout has expired to be called again, otherwise it starts with the next batch immediately. This allows you to rate limit the work being done so that it doesn’t swamp other systems.

The checkpoint allows the thread to be stopped and then know where to start up again. Since code is being pushed out on a regular basis, you need to be able to restart the work being done. The other requirement is that the work needs to be idempotent so that the checkpoints don’t need to be overly fine grained.

The initial version of this code worked great, scanned content items and saved the meta data for thumbnail images. An issue though is that if the html didn’t include the height and width of the image, which I need to get, then I had to go fetch the image to determine the size. This fetching of external images introduces a significant wait which slowed down the overall throughput. The answer of course was to make this system multithreaded. A great way to do this, especially because I still wanted control over the throughput was to use the FixedThreadPool from the new concurrent packages.

My initial pass used some code like:

int threadCount = 10;
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
for(Content content:contentList){
 ImageRunnable ir = new ImageRunnable();
 ir.setContent(content);
 ir.setParent(this);
 pool.execute(ir);
}

ImageRunnable had the process method. Say, the contentList is a List<Content> with 10,000 content items. What would happen is that this code would complete immediately with 10,000 ImageRunnables created and added to the ThreadPool’s blocking queue. This would actually work and the 10 threads would process the work to be done. The problem is that the master Scanner thread has lost track of when all of the work submitted has been completed. So it would be very simple for the Scanner to get ahead of itself and keep adding items to the blocking queue until errors start getting thrown due to lack of memory. So all of the benefits of using the Scanner have been removed. What I wanted to have happen was that the pool.execute(ir) call would block if the 10 threads are currently working. So the way that I came across to get this done was to use a Semaphore object. The code to do this now looks like:

int threadCount = 10;
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
Semaphore permits = new Semaphore(threadCount);
for(Content content:contentList){
 ImageRunnable ir = new ImageRunnable();
 ir.setContent(content);
 ir.setParent(this);
 try {
   permits.acquire();
   pool.execute(ir);
 } catch (InterruptedException e) {
   e.printStackTrace();
 }
}

The trick is that the process method then has to call release() when it has completed its work. But now the thread blocks at permits.acquire() until a slot is open. This does exactly what I want by not allowing Runnables to be submitted to the blocking queue until there is a thread ready to take it. Now the check pointing and rate limiting work exactly as with a single threaded Scanner, but it can use multiple threads.

 
 
 
 

Post a Comment