Monday, November 17, 2008

Combining Callable or Runnable with Future and FutureTask

Picture courtesy of markhillary@flickr
In this part of the Java concurrency tutorial I will present differences between Runnable and Callable interfaces as well as when and how to use Future interface and FutureTask class. As a side-effect of this "lesson" I will present how to use ExecutorService.

I will present all of this on a real example where I "parallelize" a sequential algorithm in order to utilize more resources and to get better performance.

To make it as simple as possible I'm using very basic requirements. I'm implementing a Java class that goes through the predefined list of web pages (URLs) and counts the number of bytes for each resource - primitive crawler (without analysing the content). This class has to sum up all the results and display total number of bytes read. It also has to display total time spent on this task. Here is the code for the test class that uses Callable as a worker that reads the web page:

package futures;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {

public static void main(String[] args)
throws InterruptedException, ExecutionException {

@SuppressWarnings("serial")
List<String> urls = new ArrayList<String>() {{
add("http://www.java2jee.blogspot.com");
add("http://www.yahoo.com");
add("http://www.msdn.com");
add("http://c2.com/xp/ExtremeProgrammingRoadmap.html");
add("http://apache.org/");
add("http://sourceforge.net/");
}};
List<Future<Integer>> futures =
new ArrayList<Future<Integer>>(urls.size());

final ExecutorService service =
Executors.newFixedThreadPool(1);

try {
long start = System.nanoTime();
for (String url : urls) {
futures.add(service.submit(new CallableExample(url)));
}

long result = 0;
for (Future<Integer> future : futures) {
result += future.get();
}
System.out.println("\nTotal bytes: " + result);
System.out.println("Total time: "
+ (System.nanoTime() - start) / 1000000
+ "ms");
} finally {
service.shutdown();
}
}
}

and the "worker" class follows:

package futures;

import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.net.URL;
import java.util.concurrent.Callable;

class CallableExample implements Callable<Integer> {
private final String currentUrl;

CallableExample(String currentUrl) {
this.currentUrl = currentUrl;
}

public Integer call() throws Exception {
int result = 0;
URL url = new URL(currentUrl);
LineNumberReader in =
new LineNumberReader(
new InputStreamReader(url.openStream()));

try {
String line = null;
while ((line = in.readLine()) != null) {
result += line.length();
}
} finally {
in.close();
}

System.out.println(currentUrl + " has " + result + " bytes");
return result;
}
}

And now a bit of explanation. Following line service.submit(new CallableExample(url)) submits new task to the Executor Service (created earlier) that will be invoked as soon as any thread in the pool is available. The submit(...) method returns a Future which in turn is added to the task list futures. After submitting all the tasks this program iterates over each previously submitted Future, gets the result and adds it the overall result variable result one by one. Future's get() method blocks execution of the program until the result is available and then returns the value (Integer in this example). Eventually this program shows the summary. Simple :)

As you can see in the Test class there is this line final ExecutorService service = Executors.newFixedThreadPool(1); that creates tread pool of size 1. It means that all the web pages will be visited sequentially one by one. Try to experiment with this parameter and see with what value you get the best results (on my machine thread pool of size 3 works pretty good and increasing this number more does not help).

With 3 threads each thread takes one web page, loads it and checks the number of bytes. As you can see we gained a lot by "parallelizing" such simple application. And this is the power of concurrent programming. Using simple mechanisms brought in Java 5 your applications can be more efficient.

Below I presented the same "worker" functionality as above but this time I'm implementing Runnable interface

package futures;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.net.MalformedURLException;
import java.net.URL;

class RunnableExample implements Runnable {

private String currentUrl;
private int result = -1;

RunnableExample(String currentUrl) {
this.currentUrl = currentUrl;
}

public void run() {
int result = 0;
URL url = null;
LineNumberReader in = null;
try {
url = new URL(currentUrl);
in = new LineNumberReader(
new InputStreamReader(url.openStream()));

String line = null;
while ((line = in.readLine()) != null) {
result += line.length();
}
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
} catch (IOException e) {
throw new IllegalStateException(e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
// ignore
}
}
}

System.out.println(currentUrl + " has " + result + " bytes");
this.result = result;
}

int getResult() {
return result;
}
}

In order to change the "worker" class you have to modify this line of code: futures.add(service.submit(new CallableExample(url))) into this one: futures.add(service.submit(new RunnableExample(url)), 0). It will not work as the previous example i.e. get() method will always return 0 instead of the number of bytes read by the task. I will leave the solution to this problem as an exercise for you dear readers. You will see that using Runnable interface is somewhat obsolete and can be pain in the a**.

I postponed the subject of FutureTask to the end. This class is considered by me as a helper class and will help you in many situations where you want to use Java 5 facilities, namely Future and your legacy Java code (or 3rd party components) accepts only Runnable interface. FutureTask implements both interfaces in question and you can treat is as a Runnable or Callable depending on the context or library capabilities. For more information plese refer to JDK documentation.

I hope this article helps you in understanding core Java concurrency components described above. If you find it interesting and useful or you have some ideas how it could be improved your feedback is more than welcome.

6 comments:

Tim said...

Unfortunately, Java concurrency is not that easy. Or at least you don't synchronize your member access correctly. If you provide a default value for a member (private int result = -1), you can't access it from a thread and expect the member to be -1. According to the Java spec it may be either 0 or -1 when you read it from any other thread except the one you created it in.
Similarly, you can't set a member variable in a constructor and then read it from a thread, unless you declare the member volatile or synchronize all access to the member variable. Again, there is no guarantee that your other threads can see the value you wrote.

Przemysław Bielicki said...
This comment has been removed by the author.
Przemysław Bielicki said...

If you are talking about RunnableExample class I totally agree with you. This is just one more argument not to use them (Runnables) - unless you really really have to.

With CallableExample you don't have such problem - and I suggested using this one.

Tim said...

The CallableExample is also broken: first you set the member Callable.currentUrl in the constructor. And then you read currentUrl in a different thread. There is no guarantee in Java that the constructor's write to currentUrl is already readable at the time that your other thread is reading it. You can't write something in one thread and read it in another, without using some way to synchronize the memory (in reality, on today's CPUs, it may work very often though!).

Imagine a dual-core CPU with a separate cache for each core. The constructor is running in the first core, and the call() method is later running in the second core. How do you make sure that the first core writes the content of the cache to the common RAM after you have set 'currentUrl'? And assuming the second core already had currentUrl's memory region in its cache before, how do you make the second core refreshes its RAM in order to read the new value of currentUrl?

In your code you don't.

In order to correct it, you would have to make the currentUrl member either 'final', 'volatile', or you have to synchronize both the write in the constructor and the reads in call() to the same object.

Otherwise, according to the Java spec, in all threads except the one that created the object, the content of the member currentUrl may be either the value you set in the constructor or the default value (null).

Przemysław Bielicki said...

Thanks tim for noticing that - my intent was to make this field final - I fixed it.

Cheers!

Slawomir Ginter said...

I can't wait for an article about cancelling FutureTasks, cancelling running background tasks etc.

Generally I consider the title of you series harmful - please rename to "Java concurrency is not prohibitively difficult" ;-)

One of the things related to concurrency I often say is "don't use ConcurrentHashMap (and family) until you are familiar with Java concurrency - for 99.99% you don't get what you mean".

In case of Java concurrency a little knowledge is a really dangerous thing.