Thursday, January 15, 2009

The End Of Rings Around Plain Java - A Better Concurrency Test

In my previous post, I was wondering why single thread was faster. D Andreou gave the correct explanation: as we send only 1 start message and as each node only send 1 message to the next one, there is always only 1 message being processed. So the test is optimum on 1 thread. It does not make much sense to make a multithreading benchmark on a problem that is fundamentally single threaded.


His suggestion was to simple send N start messages where N >= number of processors. In theory, the performance will become optimal with N threads then. Unfortunately this is not what happened in real life. In real life the single threaded performance is still better if you send even 16 messages on a biprocessor machine.

    public static void main(String[] args) throws Exception {
        OptimizedRing ring = new OptimizedRing();
        RingNode node = ring.startRing(Integer.parseInt(args[0]));
        node.sendMessage(new StartMessage());
        node.sendMessage(new TokenMessage(node.nodeId,1));
        node.sendMessage(new TokenMessage(node.nodeId,1));
        node.sendMessage(new TokenMessage(node.nodeId,1));
        ring.executor.awaitTermination(10, TimeUnit.MINUTES);
    }

My idea was that it was related to the swiching from thread to thread overhead, which is precisely what I think the original author of the test had in mind to test. I am not 100% convinced it is really what's happening. I wanted a test that would actually be faster using N threads; so I decided to add a bit of computation at before processing each Token. Unfortunately I had the bad idea to compute Pi by Monte Carlo method to do that. Running my tests I was surprised it did not change the results, and made things worse the most computer intensive the computation was (increasing the number of monte carlo iterations). It scared me a bit wondering what the hell could be wrong there. The following class performs much worse with 2 threads compared to 1:

public class BadParallelPi {

    private static void startExecutors() throws Exception {        
        long startTime = System.currentTimeMillis();
        System.out.println(startTime);
        ExecutorService executor1 = Executors.newFixedThreadPool(1);
        executor1.execute(new Computation());
        executor1.execute(new Computation());
        executor1.shutdown();
        executor1.awaitTermination(60, TimeUnit.SECONDS);
        long delay = System.currentTimeMillis() - startTime;
        System.out.println("finished single thread in "+(delay/1000.0));
        startTime = System.currentTimeMillis();
        System.out.println(startTime);
        executor1 = Executors.newFixedThreadPool(2);
        executor1.execute(new Computation());
        executor1.execute(new Computation());
        executor1.shutdown();
        executor1.awaitTermination(60, TimeUnit.SECONDS);
        delay = System.currentTimeMillis() - startTime;
        System.out.println("finished 2 threads in "+(delay/1000.0));
    }
    
    public static class Computation implements Runnable {
        public volatile int count = 0;
         private double computePi() {
            double pi = 0;
            double x,y;
            int n = 10000000;
            for (int i=0;i<n;i++) {
                x = Math.random();
                x *= x;
                y = Math.random();
                y *= y;
                if (x+y < 1) {
                    pi +=1;
                }
            }
            pi = 4*pi/n;
            return pi;
        }
        
        public void run() {
            double pi = computePi();
            long time = System.currentTimeMillis();
            System.out.println(time+" thread "+Thread.currentThread().getId()+" pi="+pi);
            count++;
        }        
    }

    
    public static void main(String[] args) throws Exception {
        startExecutors();
    }
}

Did you figure out why?


It took me less time with this simple code than with the original ring test to find out why. It is simply because of the Math.random call. Math.random only creates one random number generator, and it will be shared among threads. So every thread will wait at the other one at this point. Creating one random generator per thread showed 2 threads were much faster than 1, finally.


Back to the original ring test. Adding the correct way to compute Pi by Monte Carlo, I now had decent test results as long as the number of iterations is not too small. 10 iterations is enough to show a real difference between N threads and 1. Adding a small computation helps figuring out what happens behind the scene. You can also verify D Andreou claim, using only 1 start message the single threaded version is faster. If computation is too weak (for example number of Monte Carlo iteration of 0, one only measures method calls between threads (context switching), which is obviously optimal for 1 thread. Measuring Actor libraries on it is dangerous: if I write a single threaded Actor library, it will be the fastest of this test, but it certainly is not what you want to use as Actor library.


Let's see now how Scala fares compared to the Plain Java solution, using computation:

Machine

Algorithm

Time for 100000 ring count, 10 mc, 4 messages

Time for 10000 ring count, 100 mc, 4 messages

Core2Duo

OptimizedRing 2 Threads

57s

37s

Core2Duo

OptimizedRing 4 Threads

78s

39s

Core2Duo

Scala Actors

82s

47s

Core2Duo

SimpleRing (100 Threads)

137s

58s

Core2Duo

OptimizedRing 1 Thread

89s

71s

Core2Quad

OptimizedRing 4 Threads

81s

25s

Core2Quad

Scala Actors

71s

30s

Core2Quad

OptimizedRing 2 Threads

61s

43s

Core2Quad

OptimizedRing 1 Threads

100s

80s

The Core2Duo is Intel(R) Core(TM)2 Duo CPU T7250 @ 2.00GHz
The Core2Quad is Intel(R) Core(TM)2 Quad CPU Q6600 @ 2.40GHz

It is interesting to compare results of 4 threads on a biprocessor with monte carlo count of 10 and 100. We see a much higher thread overhead with fewer computation. With too few computation in monte carlo, the overhead of threads is too high over 2 concurrent threads. This explains why the very simple threading architecture fares much better in the last column compared to the previous one.

Scala Actors fares much better when it is not hindered in the creation of too many threads. It seem actually very good at abstracting multithreading intricacies, while still providing near Java performance in the real world where each actor does enough computation and multithreading is important.

The End Of Rings Around Plain Java - A Better Concurrency Test

In my previous post, I was wondering why single thread was faster. D Andreou gave the correct explanation: as we send only 1 start message and as each node only send 1 message to the next one, there is always only 1 message being processed. So the test is optimum on 1 thread. It does not make much sense to make a multithreading benchmark on a problem that is fundamentally single threaded.


His suggestion was to simple send N start messages where N >= number of processors. In theory, the performance will become optimal with N threads then. Unfortunately this is not what happened in real life. In real life the single threaded performance is still better if you send even 16 messages on a biprocessor machine.

    public static void main(String[] args) throws Exception {
        OptimizedRing ring = new OptimizedRing();
        RingNode node = ring.startRing(Integer.parseInt(args[0]));
        node.sendMessage(new StartMessage());
        node.sendMessage(new TokenMessage(node.nodeId,1));
        node.sendMessage(new TokenMessage(node.nodeId,1));
        node.sendMessage(new TokenMessage(node.nodeId,1));
        ring.executor.awaitTermination(10, TimeUnit.MINUTES);
    }

My idea was that it was related to the swiching from thread to thread overhead, which is precisely what I think the original author of the test had in mind to test. I am not 100% convinced it is really what's happening. I wanted a test that would actually be faster using N threads; so I decided to add a bit of computation at before processing each Token. Unfortunately I had the bad idea to compute Pi by Monte Carlo method to do that. Running my tests I was surprised it did not change the results, and made things worse the most computer intensive the computation was (increasing the number of monte carlo iterations). It scared me a bit wondering what the hell could be wrong there. The following class performs much worse with 2 threads compared to 1:

public class BadParallelPi {

    private static void startExecutors() throws Exception {        
        long startTime = System.currentTimeMillis();
        System.out.println(startTime);
        ExecutorService executor1 = Executors.newFixedThreadPool(1);
        executor1.execute(new Computation());
        executor1.execute(new Computation());
        executor1.shutdown();
        executor1.awaitTermination(60, TimeUnit.SECONDS);
        long delay = System.currentTimeMillis() - startTime;
        System.out.println("finished single thread in "+(delay/1000.0));
        startTime = System.currentTimeMillis();
        System.out.println(startTime);
        executor1 = Executors.newFixedThreadPool(2);
        executor1.execute(new Computation());
        executor1.execute(new Computation());
        executor1.shutdown();
        executor1.awaitTermination(60, TimeUnit.SECONDS);
        delay = System.currentTimeMillis() - startTime;
        System.out.println("finished 2 threads in "+(delay/1000.0));
    }
    
    public static class Computation implements Runnable {
        public volatile int count = 0;
         private double computePi() {
            double pi = 0;
            double x,y;
            int n = 10000000;
            for (int i=0;i<n;i++) {
                x = Math.random();
                x *= x;
                y = Math.random();
                y *= y;
                if (x+y < 1) {
                    pi +=1;
                }
            }
            pi = 4*pi/n;
            return pi;
        }
        
        public void run() {
            double pi = computePi();
            long time = System.currentTimeMillis();
            System.out.println(time+" thread "+Thread.currentThread().getId()+" pi="+pi);
            count++;
        }        
    }

    
    public static void main(String[] args) throws Exception {
        startExecutors();
    }
}

Did you figure out why?


It took me less time with this simple code than with the original ring test to find out why. It is simply because of the Math.random call. Math.random only creates one random number generator, and it will be shared among threads. So every thread will wait at the other one at this point. Creating one random generator per thread showed 2 threads were much faster than 1, finally.


Back to the original ring test. Adding the correct way to compute Pi by Monte Carlo, I now had decent test results as long as the number of iterations is not too small. 10 iterations is enough to show a real difference between N threads and 1. Adding a small computation helps figuring out what happens behind the scene. You can also verify D Andreou claim, using only 1 start message the single threaded version is faster. If computation is too weak (for example number of Monte Carlo iteration of 0, one only measures method calls between threads (context switching), which is obviously optimal for 1 thread. Measuring Actor libraries on it is dangerous: if I write a single threaded Actor library, it will be the fastest of this test, but it certainly is not what you want to use as Actor library.


Let's see now how Scala fares compared to the Plain Java solution, using computation:

Machine

Algorithm

Time for 100000 ring count, 10 mc, 4 messages

Time for 10000 ring count, 100 mc, 4 messages

Core2Duo

OptimizedRing 2 Threads

57s

37s

Core2Duo

OptimizedRing 4 Threads

78s

39s

Core2Duo

Scala Actors

82s

47s

Core2Duo

SimpleRing (100 Threads)

137s

58s

Core2Duo

OptimizedRing 1 Thread

89s

71s

Core2Quad

OptimizedRing 4 Threads

81s

25s

Core2Quad

Scala Actors

71s

30s

Core2Quad

OptimizedRing 2 Threads

61s

43s

Core2Quad

OptimizedRing 1 Threads

100s

80s

The Core2Duo is Intel(R) Core(TM)2 Duo CPU T7250 @ 2.00GHz
The Core2Quad is Intel(R) Core(TM)2 Quad CPU Q6600 @ 2.40GHz

It is interesting to compare results of 4 threads on a biprocessor with monte carlo count of 10 and 100. We see a much higher thread overhead with fewer computation. With too few computation in monte carlo, the overhead of threads is too high over 2 concurrent threads. This explains why the very simple threading architecture fares much better in the last column compared to the previous one.

Scala Actors fares much better when it is not hindered in the creation of too many threads. It seem actually very good at abstracting multithreading intricacies, while still providing near Java performance in the real world where each actor does enough computation and multithreading is important.

Thursday, January 08, 2009

Object Oriented Analysis And Design with Applications Book Review

A while ago, I had a comment from someone implying I knew nothing about OO programming because I had not mentioned (and therefore read) Object Oriented Analysis And Design with Applications from G. Booch. I was intrigued by such a silly comment and decided to look at this book that was considered as the bible of OOP.

Well, I don't find it that good! But I don't find the bible particularly good either. I like B Meyer Object Oriented Software Construction book much more, because it is more practical, more in touch with realities while pointing at the real important problems like: "Real systems have no top"

In contrast G Booch book has too much evident concepts that don't really make you learn anything or think things a different way. It is a good book for someone who is learning OO for the first time. It covers the subject in details, but I did not find anything in it that made me say "wow!". It is like most other book you can find on OO. Furthermore only the first parts are on OO, the rest is more a UML tutorial.

Object Oriented Analysis And Design with Applications Book Review

A while ago, I had a comment from someone implying I knew nothing about OO programming because I had not mentioned (and therefore read) Object Oriented Analysis And Design with Applications from G. Booch. I was intrigued by such a silly comment and decided to look at this book that was considered as the bible of OOP.

Well, I don't find it that good! But I don't find the bible particularly good either. I like B Meyer Object Oriented Software Construction book much more, because it is more practical, more in touch with realities while pointing at the real important problems like: "Real systems have no top"

In contrast G Booch book has too much evident concepts that don't really make you learn anything or think things a different way. It is a good book for someone who is learning OO for the first time. It covers the subject in details, but I did not find anything in it that made me say "wow!". It is like most other book you can find on OO. Furthermore only the first parts are on OO, the rest is more a UML tutorial.

Running Rings Around Plain Java - The Killer Code

I wrote my previous post too fast. I found a very simple change that increases the speed x6!

The idea is too process messages in a ThreadPoolExecutor. As my Nodes are Runnable, I just needed to initialize a common ThreadPoolExecutor, and in a sendMessage, execute the runnable each time.

Here is the full code:
public class OptimizedRing {

    private ExecutorService executor;

    public static void main(String[] args) throws Exception {
        OptimizedRing ring = new OptimizedRing();
        RingNode node = ring.startRing(Integer.parseInt(args[0]));
        node.sendMessage(new StartMessage());
    }

    public RingNode startRing(int n) {
        RingNode[] nodes = spawnNodes(n, startTimer());
        connectNodes(n, nodes);
        return nodes[0];
    }

    private Timer startTimer() {
        Timer timer = new Timer();
        new Thread(timer).start();
        return timer;
    }

    private RingNode[] spawnNodes(int n, final Timer timer) {
        System.out.println("constructing nodes");
        long start = System.currentTimeMillis();
        executor = Executors.newFixedThreadPool(4);
        RingNode[] nodes = new RingNode[n+1];
        for (int i = 0; i < n ; i++) {
            nodes[i] = new RingNode(i, timer, null);
        }
        long end = System.currentTimeMillis();
        System.out.println("Took "+(end-start)+"ms to construct "+n+" nodes");
        return nodes;
    }

    private void connectNodes(int n, RingNode[] nodes) {
        System.out.println("connecting nodes");
        nodes[n] = nodes[0];
        for (int i=0; i<n; i++) {
            nodes[i].connect(nodes[i+1]);
        }
    }

    interface Message {
        String getType();
    }

    private static class StartMessage implements Message {
        public String getType() {
            return "START";
        }
    }

    private static class StopMessage implements Message {
        public String getType() {
            return "STOP";            
        }
    }

    private static class CancelMessage implements Message {
        public String getType() {
            return "CANCEL";
        }
    }

    private static class TokenMessage implements Message {
        private int nodeId;
        private int value;

        public TokenMessage(int nodeId, int value) {
            this.nodeId = nodeId;
            this.value = value;
        }

        public String getType() {
            return "TOKEN";
        }
    }

    private class RingNode implements Runnable {
        private int nodeId;
        private Timer timer;
        private RingNode nextNode;
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean isActive = false;
        
        public RingNode(int id, Timer timer, RingNode nextNode) {
            nodeId = id;
            this.timer = timer;
            this.nextNode = nextNode;                        
        }

        public void connect(RingNode node) {
            nextNode = node;
            isActive = true;
        }

        public void sendMessage(Message m) {
            queue.add(m);
            executor.execute(this);
        }

        public void run() {
            if (isActive) {
                try {
                    Message m = queue.take();
                    if (m instanceof StartMessage) {
                        log("Starting messages");
                        timer.sendMessage(m);
                        nextNode.sendMessage(new TokenMessage(nodeId, 0));
                    } else if (m instanceof StopMessage) {
                        log("Stopping");
                        nextNode.sendMessage(m);
                        isActive = false;
                        //
                    } else if (m instanceof TokenMessage) {
                        if (((TokenMessage)m).nodeId == nodeId) {
                            int nextValue = ((TokenMessage)m).value + 1;
                            if (nextValue % 10000 == 0) {
                                log("Around ring "+nextValue+" times");
                            }
                            if (nextValue == 1000000) {
                                timer.sendMessage(new StopMessage());
                                timer.sendMessage(new CancelMessage());
                                nextNode.sendMessage(new StopMessage());
                                isActive = false;
                            } else {
                                nextNode.sendMessage(new TokenMessage(nodeId, nextValue));
                            }

                        } else {
                            nextNode.sendMessage(m);
                        }
                    }

                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }

        public void log(String s) {
            System.out.println(System.currentTimeMillis()+" "+nodeId+": "+s);
        }

    }

    private static class Timer implements Runnable {
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean timing = false;
        private long startTime;

        public void sendMessage(Message m) {
            //we don't need to change this implementation as timer is rarely called
            queue.add(m);
        }

        public void run() {
            while (true) {
                Message m;
                try {
                    m = queue.take();
                    if (m instanceof StartMessage) {
                        startTime = System.currentTimeMillis();
                        timing = true;
                    } else if (m instanceof StopMessage) {
                        long end = System.currentTimeMillis();
                        System.out.println("Start="+startTime+" Stop="+end+" Elapsed="+(end-startTime));
                        timing = false;                                        
                    } else if (m instanceof CancelMessage) {
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}




Code
Spawn
Send 100M messages
Scala Actors
15ms
270104ms
SimpleRing
11ms
493073ms
OptimizedRing (4 threads)
6ms

84727ms
OptimizedRing (5+ threads)
5ms

62593ms
OptimizedRing (1 thread)
5ms

60660ms


I finally saw my 4 cores used! Max multithreaded throughput is achieved at 5 threads. However 1 thread is faster. Is this related to memory bandwith limit?

Now I am left wondering if actors are really that important if one can achieve much higher throughput using plain Java and very simple concepts (BlockingQueue, ThreadPoolExecutor). Worse, this test is actually faster with only 1 thread...

Running Rings Around Plain Java - The Killer Code

I wrote my previous post too fast. I found a very simple change that increases the speed x6!

The idea is too process messages in a ThreadPoolExecutor. As my Nodes are Runnable, I just needed to initialize a common ThreadPoolExecutor, and in a sendMessage, execute the runnable each time.

Here is the full code:
public class OptimizedRing {

    private ExecutorService executor;

    public static void main(String[] args) throws Exception {
        OptimizedRing ring = new OptimizedRing();
        RingNode node = ring.startRing(Integer.parseInt(args[0]));
        node.sendMessage(new StartMessage());
    }

    public RingNode startRing(int n) {
        RingNode[] nodes = spawnNodes(n, startTimer());
        connectNodes(n, nodes);
        return nodes[0];
    }

    private Timer startTimer() {
        Timer timer = new Timer();
        new Thread(timer).start();
        return timer;
    }

    private RingNode[] spawnNodes(int n, final Timer timer) {
        System.out.println("constructing nodes");
        long start = System.currentTimeMillis();
        executor = Executors.newFixedThreadPool(4);
        RingNode[] nodes = new RingNode[n+1];
        for (int i = 0; i < n ; i++) {
            nodes[i] = new RingNode(i, timer, null);
        }
        long end = System.currentTimeMillis();
        System.out.println("Took "+(end-start)+"ms to construct "+n+" nodes");
        return nodes;
    }

    private void connectNodes(int n, RingNode[] nodes) {
        System.out.println("connecting nodes");
        nodes[n] = nodes[0];
        for (int i=0; i<n; i++) {
            nodes[i].connect(nodes[i+1]);
        }
    }

    interface Message {
        String getType();
    }

    private static class StartMessage implements Message {
        public String getType() {
            return "START";
        }
    }

    private static class StopMessage implements Message {
        public String getType() {
            return "STOP";            
        }
    }

    private static class CancelMessage implements Message {
        public String getType() {
            return "CANCEL";
        }
    }

    private static class TokenMessage implements Message {
        private int nodeId;
        private int value;

        public TokenMessage(int nodeId, int value) {
            this.nodeId = nodeId;
            this.value = value;
        }

        public String getType() {
            return "TOKEN";
        }
    }

    private class RingNode implements Runnable {
        private int nodeId;
        private Timer timer;
        private RingNode nextNode;
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean isActive = false;
        
        public RingNode(int id, Timer timer, RingNode nextNode) {
            nodeId = id;
            this.timer = timer;
            this.nextNode = nextNode;                        
        }

        public void connect(RingNode node) {
            nextNode = node;
            isActive = true;
        }

        public void sendMessage(Message m) {
            queue.add(m);
            executor.execute(this);
        }

        public void run() {
            if (isActive) {
                try {
                    Message m = queue.take();
                    if (m instanceof StartMessage) {
                        log("Starting messages");
                        timer.sendMessage(m);
                        nextNode.sendMessage(new TokenMessage(nodeId, 0));
                    } else if (m instanceof StopMessage) {
                        log("Stopping");
                        nextNode.sendMessage(m);
                        isActive = false;
                        //
                    } else if (m instanceof TokenMessage) {
                        if (((TokenMessage)m).nodeId == nodeId) {
                            int nextValue = ((TokenMessage)m).value + 1;
                            if (nextValue % 10000 == 0) {
                                log("Around ring "+nextValue+" times");
                            }
                            if (nextValue == 1000000) {
                                timer.sendMessage(new StopMessage());
                                timer.sendMessage(new CancelMessage());
                                nextNode.sendMessage(new StopMessage());
                                isActive = false;
                            } else {
                                nextNode.sendMessage(new TokenMessage(nodeId, nextValue));
                            }

                        } else {
                            nextNode.sendMessage(m);
                        }
                    }

                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }

        public void log(String s) {
            System.out.println(System.currentTimeMillis()+" "+nodeId+": "+s);
        }

    }

    private static class Timer implements Runnable {
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean timing = false;
        private long startTime;

        public void sendMessage(Message m) {
            //we don't need to change this implementation as timer is rarely called
            queue.add(m);
        }

        public void run() {
            while (true) {
                Message m;
                try {
                    m = queue.take();
                    if (m instanceof StartMessage) {
                        startTime = System.currentTimeMillis();
                        timing = true;
                    } else if (m instanceof StopMessage) {
                        long end = System.currentTimeMillis();
                        System.out.println("Start="+startTime+" Stop="+end+" Elapsed="+(end-startTime));
                        timing = false;                                        
                    } else if (m instanceof CancelMessage) {
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}




Code
Spawn
Send 100M messages
Scala Actors
15ms
270104ms
SimpleRing
11ms
493073ms
OptimizedRing (4 threads)
6ms

84727ms
OptimizedRing (5+ threads)
5ms

62593ms
OptimizedRing (1 thread)
5ms

60660ms


I finally saw my 4 cores used! Max multithreaded throughput is achieved at 5 threads. However 1 thread is faster. Is this related to memory bandwith limit?

Now I am left wondering if actors are really that important if one can achieve much higher throughput using plain Java and very simple concepts (BlockingQueue, ThreadPoolExecutor). Worse, this test is actually faster with only 1 thread...

Running rings around plain Java

Alex Miller has a very interesting test of Actors. He finds out Scala performance is relatively low compared to Erlang, and Kilim is very near Erlang. But Kilim code is the most difficult to read in the lot.

I thought it would be simple to just do the same test in plain Java. I wrote the code for it duplicating the scala logic using Threads instead of Actors.


public class SimpleRing {
    public static void main(String[] argsthrows Exception {
        SimpleRing ring = new SimpleRing();
        RingNode node = ring.startRing(Integer.parseInt(args[0]));
        node.sendMessage(new StartMessage());
    }

    public RingNode startRing(int n) {
        RingNode[] nodes = spawnNodes(n, startTimer());
        connectNodes(n, nodes);
        return nodes[0];
    }

    private Timer startTimer() {
        Timer timer = new Timer();
        new Thread(timer).start();
        return timer;
    }


    private RingNode[] spawnNodes(int n, final Timer timer) {
        System.out.println("constructing nodes");
        long start = System.currentTimeMillis();
        RingNode[] nodes = new RingNode[n+1];

        for (int i = 0; i < n ; i++) {
            nodes[inew RingNode(i, timer, null);
            new Thread(nodes[i]).start()//later use pool
        }

        long end = System.currentTimeMillis();
        System.out.println("Took "+(end-start)+"ms to construct "+n+" nodes");

        return nodes;
    }


    private void connectNodes(int n, RingNode[] nodes) {
        System.out.println("connecting nodes");
        nodes[n= nodes[0];

       
for (int i=0; i<n; i++) {
            nodes[i].connect(nodes[i+1]);
        }
    }


    interface Message {
        String getType();
    }


    private static class StartMessage implements Message {
        public String getType() {
            return "START";
        }
    }

    private static class StopMessage implements Message {
        public String getType() {
            return "STOP";            
        }
    }


    private static class CancelMessage implements Message {
        public String getType() {
            return "CANCEL";
        }
    }


    private static class TokenMessage implements Message {
        private int nodeId;
        private int value;

        public TokenMessage(int nodeId, int value) {
            this.nodeId = nodeId;
            this.value = value;
        }

        public String getType() {
            return "TOKEN";
        }
    }


    private static class RingNode implements Runnable {
        private int nodeId;
        private Timer timer;
        private RingNode nextNode;
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();

        public RingNode(int id, Timer timer, RingNode nextNode) {
            nodeId = id;
            this.timer = timer;
            this.nextNode = nextNode;                        
        }


        public void connect(RingNode node) {
            nextNode = node;
        }


        public void sendMessage(Message m) {
            queue.add(m);
        }


        public void run() {
            while (true) {
                try {
                    Message m = queue.take();
                    if (instanceof StartMessage) {
                        log("Starting messages");
                        timer.sendMessage(m);
                        nextNode.sendMessage(new TokenMessage(nodeId, 0));
                    else if (instanceof StopMessage) {
                        log("Stopping");
                        nextNode.sendMessage(m);
                        break;
                    else if (instanceof TokenMessage) {
                        if (((TokenMessage)m).nodeId == nodeId) {
                            int nextValue = ((TokenMessage)m).value + 1;
                            if (nextValue % 10000 == 0) {
                                log("Around ring "+nextValue+" times");
                            }
                            if (nextValue == 1000000) {
                                timer.sendMessage(new StopMessage());
                                timer.sendMessage(new CancelMessage());
                                nextNode.sendMessage(new StopMessage());
                                break;
                            else {
                                nextNode.sendMessage(new TokenMessage(nodeId, nextValue));
                            }
                        else {
                            nextNode.sendMessage(m);
                        }
                    }

                catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }


        public void log(String s) {
            System.out.println(System.currentTimeMillis()+" "+nodeId+": "+s);
        }
    }


    private static class Timer implements Runnable {
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean timing = false;
        private long startTime;


        public void sendMessage(Message m) {
            queue.add(m);
        }


        public void run() {
            while (true) {
                Message m;
                try {
                    m = queue.take();
                    if (instanceof StartMessage) {
                        startTime = System.currentTimeMillis();
                        timing = true;
                    else if (instanceof StopMessage) {
                        long end = System.currentTimeMillis();
                        System.out.println("Start="+startTime+" Stop="+end+" Elapsed="+(end-startTime));
                        timing = false;                                        
                    else if (instanceof CancelMessage) {
                        break;
                    }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

I was a bit surprised by the result. It was slow and only 1 thread was really active at one time. This is why the test is particularly good. It is not trivial to reproduce the functionality in plain Java in an effective manner. It really shows how the concept of Actors can be useful.

SimpleRing
spawn 100: 11ms
send 100M messages: 493073ms

Scala Actors
spawn 100: 15 ms
send 100M messages: 270104ms

* UPDATE * with slight changes plain Java rocks!