Thursday, January 08, 2009

Running Rings Around Plain Java - The Killer Code

I wrote my previous post too fast. I found a very simple change that increases the speed x6!

The idea is too process messages in a ThreadPoolExecutor. As my Nodes are Runnable, I just needed to initialize a common ThreadPoolExecutor, and in a sendMessage, execute the runnable each time.

Here is the full code:
public class OptimizedRing {

    private ExecutorService executor;

    public static void main(String[] args) throws Exception {
        OptimizedRing ring = new OptimizedRing();
        RingNode node = ring.startRing(Integer.parseInt(args[0]));
        node.sendMessage(new StartMessage());
    }

    public RingNode startRing(int n) {
        RingNode[] nodes = spawnNodes(n, startTimer());
        connectNodes(n, nodes);
        return nodes[0];
    }

    private Timer startTimer() {
        Timer timer = new Timer();
        new Thread(timer).start();
        return timer;
    }

    private RingNode[] spawnNodes(int n, final Timer timer) {
        System.out.println("constructing nodes");
        long start = System.currentTimeMillis();
        executor = Executors.newFixedThreadPool(4);
        RingNode[] nodes = new RingNode[n+1];
        for (int i = 0; i < n ; i++) {
            nodes[i] = new RingNode(i, timer, null);
        }
        long end = System.currentTimeMillis();
        System.out.println("Took "+(end-start)+"ms to construct "+n+" nodes");
        return nodes;
    }

    private void connectNodes(int n, RingNode[] nodes) {
        System.out.println("connecting nodes");
        nodes[n] = nodes[0];
        for (int i=0; i<n; i++) {
            nodes[i].connect(nodes[i+1]);
        }
    }

    interface Message {
        String getType();
    }

    private static class StartMessage implements Message {
        public String getType() {
            return "START";
        }
    }

    private static class StopMessage implements Message {
        public String getType() {
            return "STOP";            
        }
    }

    private static class CancelMessage implements Message {
        public String getType() {
            return "CANCEL";
        }
    }

    private static class TokenMessage implements Message {
        private int nodeId;
        private int value;

        public TokenMessage(int nodeId, int value) {
            this.nodeId = nodeId;
            this.value = value;
        }

        public String getType() {
            return "TOKEN";
        }
    }

    private class RingNode implements Runnable {
        private int nodeId;
        private Timer timer;
        private RingNode nextNode;
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean isActive = false;
        
        public RingNode(int id, Timer timer, RingNode nextNode) {
            nodeId = id;
            this.timer = timer;
            this.nextNode = nextNode;                        
        }

        public void connect(RingNode node) {
            nextNode = node;
            isActive = true;
        }

        public void sendMessage(Message m) {
            queue.add(m);
            executor.execute(this);
        }

        public void run() {
            if (isActive) {
                try {
                    Message m = queue.take();
                    if (m instanceof StartMessage) {
                        log("Starting messages");
                        timer.sendMessage(m);
                        nextNode.sendMessage(new TokenMessage(nodeId, 0));
                    } else if (m instanceof StopMessage) {
                        log("Stopping");
                        nextNode.sendMessage(m);
                        isActive = false;
                        //
                    } else if (m instanceof TokenMessage) {
                        if (((TokenMessage)m).nodeId == nodeId) {
                            int nextValue = ((TokenMessage)m).value + 1;
                            if (nextValue % 10000 == 0) {
                                log("Around ring "+nextValue+" times");
                            }
                            if (nextValue == 1000000) {
                                timer.sendMessage(new StopMessage());
                                timer.sendMessage(new CancelMessage());
                                nextNode.sendMessage(new StopMessage());
                                isActive = false;
                            } else {
                                nextNode.sendMessage(new TokenMessage(nodeId, nextValue));
                            }

                        } else {
                            nextNode.sendMessage(m);
                        }
                    }

                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }

        public void log(String s) {
            System.out.println(System.currentTimeMillis()+" "+nodeId+": "+s);
        }

    }

    private static class Timer implements Runnable {
        private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
        private boolean timing = false;
        private long startTime;

        public void sendMessage(Message m) {
            //we don't need to change this implementation as timer is rarely called
            queue.add(m);
        }

        public void run() {
            while (true) {
                Message m;
                try {
                    m = queue.take();
                    if (m instanceof StartMessage) {
                        startTime = System.currentTimeMillis();
                        timing = true;
                    } else if (m instanceof StopMessage) {
                        long end = System.currentTimeMillis();
                        System.out.println("Start="+startTime+" Stop="+end+" Elapsed="+(end-startTime));
                        timing = false;                                        
                    } else if (m instanceof CancelMessage) {
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}




Code
Spawn
Send 100M messages
Scala Actors
15ms
270104ms
SimpleRing
11ms
493073ms
OptimizedRing (4 threads)
6ms

84727ms
OptimizedRing (5+ threads)
5ms

62593ms
OptimizedRing (1 thread)
5ms

60660ms


I finally saw my 4 cores used! Max multithreaded throughput is achieved at 5 threads. However 1 thread is faster. Is this related to memory bandwith limit?

Now I am left wondering if actors are really that important if one can achieve much higher throughput using plain Java and very simple concepts (BlockingQueue, ThreadPoolExecutor). Worse, this test is actually faster with only 1 thread...

22 comments :

  1. The point of the Actor model is to make concurrent programming simpler, because concurrency is hard and making mistakes is very easy.

    Actually your example demonstrates this very well, because it is broken :). The problem is that you read fields in run() that you wrote in the constructor and in connectNodes(), even though those methods do not run in the same thread as run().

    You can't do this without synchronization, volatile or final, because otherwise the Java Memory Model does not guarantee you that changes to fields made by one one thread are visible to other threads.

    ReplyDelete
  2. I recently wrote "Concurrency Rant: Different Types of Concurrency and Why Lots of People Already use ‘Erlang’ Concurrency"

    and why people use queues like BlockedQueue to communicate in threads in Java. And how this model is similar to Erlang message passing.
    (No OTP though)

    http://www.codemonkeyism.com/archives/2008/12/22/concurrency-rant-different-types-of-concurrency-and-why-lots-of-people-already-use-erlang-concurrency/

    @Tim: "connectNodes()" is part of the setup, not part of the concurrent execution.

    Peace
    -stephan

    ReplyDelete
  3. @Tim: Should the sendMessage() method be more of a problem than connectNodes()?

    Peace
    -stephan

    ReplyDelete
  4. @Tim: I did not bother to check if the code is perfectly thread safe - it is an example and it is anyway faster with 1 thread for god sake! But I think you misread the code. It is safe in constructor/connectNodes as they are called in init phase. Furthermore the sendMessage does not modify any member variable except isActive. So the only potential issue is with isActive and in this test has no chance of being remotely a problem.

    Making it thread safe is quite simple, I don't think you can expect people not to know about synchronization. Therefore I don't think Actors make anything simpler for this example.

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. @Stephan / @Fabien: it does not matter whether the constructor is invoked in the initialization phase. The problem is that it runs in a different thread.

    In terms of the Java Memory Model, a thread does not work on the shared main memory, but on a working copy of the main memory. The VM is not required to synchronize the working copy with the main memory, unless you use one of the synchronization mechanisms (synchronized / volatile / final) and in some special cases (threads starting and joining - not applicable here since you create a fixed thread pool before you initialize the objects).

    However, even though it is not required to, the VM is free to synchronize the thread's working copy with the main memory at any time. Today's real-life implementations do so much more often than they are required to. This is the reason why your implementation will work most of the time, possibly even every time on your machine.

    Still, if you write to a field in thread A, and then read that field in thread B, it is not guaranteed that B gets the value written by A, even if you can guarantee that A wrote it before B read it (there is a high probability though). That is because you did not make sure that A wrote the content of its working copy into the main memory, and that B refreshed its working copy. (The compiler or CPU may also re-order reads and writes, but that's another issue)

    To translate these terms into real hardware: your machine has 4 CPU cores, and even though they share a common main memory, each of them has a one or more caches of its own. If you are using more than one thread, and these threads utilize more than one core, the VM needs to tell the CPU when to flush and refresh the core's caches in order to synchronize them with the main memory. And Java's only hints for this are Java's synchronization primitives. Other systems, like C#/.Net, have more exlicit mechanisms and allow you to set read-barriers and write-barriers explicitlyusing the standard lib. In Java these mechanisms are hidden in synchronized/final/volatile.

    ReplyDelete
  7. @Tim: Thanks for the explanation Tim, learned something (and I thought I knew enough about volatile and the JMM during the double checked locking fix ;-)

    --
    Programming is hard - http://blog.codemonkeyism.com
    http://twitter.com/codemonkeyism

    ReplyDelete
  8. Yes thanks for the explanation Tim, however in this particular case, the init is done in the main thread, there is only one thread until the sendMessage is called. So constructor and connectNodes init is not an issue.

    ReplyDelete
  9. This comment has been removed by the author.

    ReplyDelete
  10. I think my initial comment was not posted. Anyway, in summary:

    It's obvious why a single thread is the fastest. There is no concurrency in this program. At most, only a single thread can work. If this test is about concurrency, it's a failed one, and perhaps you should modify your conclusions to reflect that (and not prompt others naively using a single thread where real concurrency exists).

    If you want to make the test more interesting, an easy way is to put more tokens there, say at least 4. The single thread won't be able to cope with that.

    ReplyDelete
  11. @Dimitris:

    "If you want to make the test more interesting, an easy way is to put more tokens there, say at least 4. The single thread won't be able to cope with that."

    Not sure what you mean.

    executor = Executors.newFixedThreadPool(4);

    Using a Thread Pool with 4 threads usually means that there are four threads, aren't there?

    Stephan

    ReplyDelete
  12. @Stephan:
    At every instant of time, only one task is available for a thread to work: the thread that has the token - all other threads (regardless how many) either spin or block. This by definition doesn't scale beyond one thread.

    Putting more tokens would have the effect of producing more parallelizable work for the threads. There are other ways of course too. But as it is now, the code is pointless.

    ReplyDelete
  13. Now I get why we definetly need the Fork/Join Framework. If such a trivial example is so easy to get wrong, wait till concurrency hits real business logic.

    ReplyDelete
  14. @Dimitris: you are right, this is the correct explanation why this test is better performed with 1 thread.

    But if I send 2 tokens instead of 1, the increase of messages will be exponential. While it will measure multithreading performance better it changes the test quite a lot and I still need to figure out how it can make sense.

    In simple benchmarks I noticed with 2 tokens the SimpleRing program is the most efficient, then Scala, then the Optimized version. But this might be because they don't process the tokens in the same order (the first that reaches the maximum count stops everything).

    ReplyDelete
  15. I was thinking adding some more tokens as in adding a few "node.sendMessage(new StartMessage());" lines, never grow their number thereafter.

    Anyway. I think you should modify the conclusions of the original blog, not to misguide others (as it is, it can easily be classified as FUD)

    ReplyDelete
  16. By the way. @Tim:

    To translate these terms into real hardware: your machine has 4 CPU cores, and even though they share a common main memory, each of them has a one or more caches of its own. If you are using more than one thread, and these threads utilize more than one core, the VM needs to tell the CPU when to flush and refresh the core's caches in order to synchronize them with the main memory.


    Speaking of real hardware, this view is entirely wrong. A complicated story put short: On cache-coherent systems (which is practically equivalent to "any computer 99,99% of people ever encountered in their lifetime", desktops, laptops etc), when any write occurs to any cache, all processors are guaranteed to see that value upon next read. Barriers are needed to prevent operation reorderings, which is the reason why a write might not be readily seen: it's because it didn't execute yet. NOT because it did execute, but then somehow the other processors failed to see it, "because the cache did not synchronize with main memory", etc. The last phrase is meaningless in such a system (it contradicts cache-coherence).

    ReplyDelete
  17. Kieron WilkinsonJanuary 12, 2009 2:00 PM

    Maybe I missing something here, but isn't this a little bit of an apples-oranges comparison?

    I thought that the point of Actors was to abstract away from the particular threading model...

    I suppose it's the age-old trade-off between scalability and efficiency.

    ReplyDelete
  18. @Dimitris: I tried with 2 or more tokens as you described, it does not change anything. 1 thread is faster by 5-10%. I even tried to add more computation in the processing of each message (calculating pi by monte carlo ;), without change on the fact that 1 thread is faster.

    This is a bit strange, I would not have expected this result.

    ReplyDelete
  19. This can't be right. Perhaps the single thread version stops when it finds a single StopMessage? (Anyhow you must first verify that the amount of work performed is equivalent. It could be something else, like accidental contention or so).

    ReplyDelete
  20. while it is true that a single StopMessage will stop everything, from the logs and from the total time it takes, improving StopMessage handling won't change the conclusions. There must definately be a bottleneck somewhere else. But strangely it is not specific not my multithreaded algorithm since the Scala version does not fare better than the single thread version either (actually much worse).

    What's stranger is the fact that 4 cores are really 100% used when I declare the use of many threads or with Scala. But somehow they work it out slower than a single thread. And the fact that putting intense computation for each message handling does not change any conclusion puzzles me even more.

    ReplyDelete
  21. I don't really see how you ensure that the amount of work done is constant, and only the number of threads change. If the first StopMessage stops everything, then the amount of work increases as you add threads.

    ReplyDelete
  22. I posted the results of improved tests that include some computation and explaining why even with 2 or more start messages, any algorithm failed to parallelize properly.

    http://chasethedevil.blogspot.com/2009/01/end-of-rings-around-plain-java-better.html

    ReplyDelete