The idea is too process messages in a ThreadPoolExecutor. As my Nodes are Runnable, I just needed to initialize a common ThreadPoolExecutor, and in a sendMessage, execute the runnable each time.
Here is the full code:
public class OptimizedRing {
private ExecutorService executor;
public static void main(String[] args) throws Exception {
OptimizedRing ring = new OptimizedRing();
RingNode node = ring.startRing(Integer.parseInt(args[0]));
node.sendMessage(new StartMessage());
}
public RingNode startRing(int n) {
RingNode[] nodes = spawnNodes(n, startTimer());
connectNodes(n, nodes);
return nodes[0];
}
private Timer startTimer() {
Timer timer = new Timer();
new Thread(timer).start();
return timer;
}
private RingNode[] spawnNodes(int n, final Timer timer) {
System.out.println("constructing nodes");
long start = System.currentTimeMillis();
executor = Executors.newFixedThreadPool(4);
RingNode[] nodes = new RingNode[n+1];
for (int i = 0; i < n ; i++) {
nodes[i] = new RingNode(i, timer, null);
}
long end = System.currentTimeMillis();
System.out.println("Took "+(end-start)+"ms to construct "+n+" nodes");
return nodes;
}
private void connectNodes(int n, RingNode[] nodes) {
System.out.println("connecting nodes");
nodes[n] = nodes[0];
for (int i=0; i<n; i++) {
nodes[i].connect(nodes[i+1]);
}
}
interface Message {
String getType();
}
private static class StartMessage implements Message {
public String getType() {
return "START";
}
}
private static class StopMessage implements Message {
public String getType() {
return "STOP";
}
}
private static class CancelMessage implements Message {
public String getType() {
return "CANCEL";
}
}
private static class TokenMessage implements Message {
private int nodeId;
private int value;
public TokenMessage(int nodeId, int value) {
this.nodeId = nodeId;
this.value = value;
}
public String getType() {
return "TOKEN";
}
}
private class RingNode implements Runnable {
private int nodeId;
private Timer timer;
private RingNode nextNode;
private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
private boolean isActive = false;
public RingNode(int id, Timer timer, RingNode nextNode) {
nodeId = id;
this.timer = timer;
this.nextNode = nextNode;
}
public void connect(RingNode node) {
nextNode = node;
isActive = true;
}
public void sendMessage(Message m) {
queue.add(m);
executor.execute(this);
}
public void run() {
if (isActive) {
try {
Message m = queue.take();
if (m instanceof StartMessage) {
log("Starting messages");
timer.sendMessage(m);
nextNode.sendMessage(new TokenMessage(nodeId, 0));
} else if (m instanceof StopMessage) {
log("Stopping");
nextNode.sendMessage(m);
isActive = false;
//
} else if (m instanceof TokenMessage) {
if (((TokenMessage)m).nodeId == nodeId) {
int nextValue = ((TokenMessage)m).value + 1;
if (nextValue % 10000 == 0) {
log("Around ring "+nextValue+" times");
}
if (nextValue == 1000000) {
timer.sendMessage(new StopMessage());
timer.sendMessage(new CancelMessage());
nextNode.sendMessage(new StopMessage());
isActive = false;
} else {
nextNode.sendMessage(new TokenMessage(nodeId, nextValue));
}
} else {
nextNode.sendMessage(m);
}
}
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
public void log(String s) {
System.out.println(System.currentTimeMillis()+" "+nodeId+": "+s);
}
}
private static class Timer implements Runnable {
private BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
private boolean timing = false;
private long startTime;
public void sendMessage(Message m) {
//we don't need to change this implementation as timer is rarely called
queue.add(m);
}
public void run() {
while (true) {
Message m;
try {
m = queue.take();
if (m instanceof StartMessage) {
startTime = System.currentTimeMillis();
timing = true;
} else if (m instanceof StopMessage) {
long end = System.currentTimeMillis();
System.out.println("Start="+startTime+" Stop="+end+" Elapsed="+(end-startTime));
timing = false;
} else if (m instanceof CancelMessage) {
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Code | Spawn | Send 100M messages |
Scala Actors | 15ms | 270104ms |
SimpleRing | 11ms | 493073ms |
OptimizedRing (4 threads) | 6ms | 84727ms |
OptimizedRing (5+ threads) | 5ms | 62593ms |
OptimizedRing (1 thread) | 5ms | 60660ms |
I finally saw my 4 cores used! Max multithreaded throughput is achieved at 5 threads. However 1 thread is faster. Is this related to memory bandwith limit?
Now I am left wondering if actors are really that important if one can achieve much higher throughput using plain Java and very simple concepts (BlockingQueue, ThreadPoolExecutor). Worse, this test is actually faster with only 1 thread...
The point of the Actor model is to make concurrent programming simpler, because concurrency is hard and making mistakes is very easy.
ReplyDeleteActually your example demonstrates this very well, because it is broken :). The problem is that you read fields in run() that you wrote in the constructor and in connectNodes(), even though those methods do not run in the same thread as run().
You can't do this without synchronization, volatile or final, because otherwise the Java Memory Model does not guarantee you that changes to fields made by one one thread are visible to other threads.
I recently wrote "Concurrency Rant: Different Types of Concurrency and Why Lots of People Already use ‘Erlang’ Concurrency"
ReplyDeleteand why people use queues like BlockedQueue to communicate in threads in Java. And how this model is similar to Erlang message passing.
(No OTP though)
http://www.codemonkeyism.com/archives/2008/12/22/concurrency-rant-different-types-of-concurrency-and-why-lots-of-people-already-use-erlang-concurrency/
@Tim: "connectNodes()" is part of the setup, not part of the concurrent execution.
Peace
-stephan
@Tim: Should the sendMessage() method be more of a problem than connectNodes()?
ReplyDeletePeace
-stephan
@Tim: I did not bother to check if the code is perfectly thread safe - it is an example and it is anyway faster with 1 thread for god sake! But I think you misread the code. It is safe in constructor/connectNodes as they are called in init phase. Furthermore the sendMessage does not modify any member variable except isActive. So the only potential issue is with isActive and in this test has no chance of being remotely a problem.
ReplyDeleteMaking it thread safe is quite simple, I don't think you can expect people not to know about synchronization. Therefore I don't think Actors make anything simpler for this example.
@Tim: Thanks for the explanation Tim, learned something (and I thought I knew enough about volatile and the JMM during the double checked locking fix ;-)
ReplyDelete--
Programming is hard - http://blog.codemonkeyism.com
http://twitter.com/codemonkeyism
Yes thanks for the explanation Tim, however in this particular case, the init is done in the main thread, there is only one thread until the sendMessage is called. So constructor and connectNodes init is not an issue.
ReplyDeleteThis comment has been removed by the author.
ReplyDelete@Dimitris:
ReplyDelete"If you want to make the test more interesting, an easy way is to put more tokens there, say at least 4. The single thread won't be able to cope with that."
Not sure what you mean.
executor = Executors.newFixedThreadPool(4);
Using a Thread Pool with 4 threads usually means that there are four threads, aren't there?
Stephan
@Stephan:
ReplyDeleteAt every instant of time, only one task is available for a thread to work: the thread that has the token - all other threads (regardless how many) either spin or block. This by definition doesn't scale beyond one thread.
Putting more tokens would have the effect of producing more parallelizable work for the threads. There are other ways of course too. But as it is now, the code is pointless.
Now I get why we definetly need the Fork/Join Framework. If such a trivial example is so easy to get wrong, wait till concurrency hits real business logic.
ReplyDelete@Dimitris: you are right, this is the correct explanation why this test is better performed with 1 thread.
ReplyDeleteBut if I send 2 tokens instead of 1, the increase of messages will be exponential. While it will measure multithreading performance better it changes the test quite a lot and I still need to figure out how it can make sense.
In simple benchmarks I noticed with 2 tokens the SimpleRing program is the most efficient, then Scala, then the Optimized version. But this might be because they don't process the tokens in the same order (the first that reaches the maximum count stops everything).
I was thinking adding some more tokens as in adding a few "node.sendMessage(new StartMessage());" lines, never grow their number thereafter.
ReplyDeleteAnyway. I think you should modify the conclusions of the original blog, not to misguide others (as it is, it can easily be classified as FUD)
By the way. @Tim:
ReplyDeleteTo translate these terms into real hardware: your machine has 4 CPU cores, and even though they share a common main memory, each of them has a one or more caches of its own. If you are using more than one thread, and these threads utilize more than one core, the VM needs to tell the CPU when to flush and refresh the core's caches in order to synchronize them with the main memory.
Speaking of real hardware, this view is entirely wrong. A complicated story put short: On cache-coherent systems (which is practically equivalent to "any computer 99,99% of people ever encountered in their lifetime", desktops, laptops etc), when any write occurs to any cache, all processors are guaranteed to see that value upon next read. Barriers are needed to prevent operation reorderings, which is the reason why a write might not be readily seen: it's because it didn't execute yet. NOT because it did execute, but then somehow the other processors failed to see it, "because the cache did not synchronize with main memory", etc. The last phrase is meaningless in such a system (it contradicts cache-coherence).
Maybe I missing something here, but isn't this a little bit of an apples-oranges comparison?
ReplyDeleteI thought that the point of Actors was to abstract away from the particular threading model...
I suppose it's the age-old trade-off between scalability and efficiency.
@Dimitris: I tried with 2 or more tokens as you described, it does not change anything. 1 thread is faster by 5-10%. I even tried to add more computation in the processing of each message (calculating pi by monte carlo ;), without change on the fact that 1 thread is faster.
ReplyDeleteThis is a bit strange, I would not have expected this result.
This can't be right. Perhaps the single thread version stops when it finds a single StopMessage? (Anyhow you must first verify that the amount of work performed is equivalent. It could be something else, like accidental contention or so).
ReplyDeletewhile it is true that a single StopMessage will stop everything, from the logs and from the total time it takes, improving StopMessage handling won't change the conclusions. There must definately be a bottleneck somewhere else. But strangely it is not specific not my multithreaded algorithm since the Scala version does not fare better than the single thread version either (actually much worse).
ReplyDeleteWhat's stranger is the fact that 4 cores are really 100% used when I declare the use of many threads or with Scala. But somehow they work it out slower than a single thread. And the fact that putting intense computation for each message handling does not change any conclusion puzzles me even more.
I don't really see how you ensure that the amount of work done is constant, and only the number of threads change. If the first StopMessage stops everything, then the amount of work increases as you add threads.
ReplyDeleteI posted the results of improved tests that include some computation and explaining why even with 2 or more start messages, any algorithm failed to parallelize properly.
ReplyDeletehttp://chasethedevil.blogspot.com/2009/01/end-of-rings-around-plain-java-better.html