I've been doing some pretty intense concurrency work. I know we've previously talked about some type of message matching as something we need, but that hasn't really been a big problem for me. What has been my primary issue is the notion of coalescing messages to efficiently merge pending work.
I have been debating different strategies to make message passing more "pluggable", but haven't really come up with any good ideas versus just enhancing the core Thread class itself.
I've pushed a change for a new Thread.loopCoalescing method.
The docs:
**
** Enter a coalescing message loop. This method follows the same
** semantics as `loop`, but has the ability to coalesce the messages
** pending in the thread's message queue.
**
** The 'toKey' function is used to derive a key for each message,
** or if null then the message itself is used as the key. If the 'toKey'
** function returns null, then the message is not considered for coalescing.
** Internally messages are indexed by key for efficient coalescing.
**
** If an incoming message has the same key as a pending message
** in the queue, then the 'coalesce' function is called to coalesce
** the messages into a new merged message. If 'coalesce' is null,
** then we use the original message. The coalesced message occupies
** the same position in the queue as the original and the incoming
** message is discarded.
**
** Both the 'toKey' and 'coalesce' functions are called while holding
** an internal lock on the queue. So the functions must be efficient
** and never attempt to interact with other threads.
**
** See [docLang]`docLang::Threading#coalescing` for more information.
**
Void loopCoalescing(|Obj? msg->Obj?|? toKey,
|Obj? orig, Obj? incoming->Obj?|? coalesce,
|Obj? msg->Obj?| receive)
Coalescing Messages
Often when sending messages to a thread, we can merge two messages into a single message to save ourselves some work. For example, it is common in windowing systems to maintain a single union of all the dirty portions of a window rather than of a bunch of little rectangles. A thread can have its messages automatically coalesced using the loopCoalescing method.
Let's look at an example:
class Repaint
{
new make(Window w, Rect d) { ... }
Window window
Rect dirty
}
toKey := |Repaint msg->Obj| { msg.window }
coalesce := |Repaint a, Repaint b->Obj| { Repaint(a.window, a.dirty.union(b.dirty)) }
loopCoalescing(toKey, coalesce) |Repaint msg| {...}
In this example the messages are instances of Repaint. The toKey function is used to obtain the key which determines if two messages can be coalesced. In this example we coalesce repaints per window. If the thread detects two pending messages with the same key (the window in this case), then it calls the coalesce function to merge the messages. In example we return a new Repaint event with the union of the two dirty regions.
If you have feedback, please share.
JohnDGSun 22 Feb 2009
This is a useful feature, but there is a more general pattern here.
I've written quite a few large-scale messaging frameworks, and the patterns I see fall into two categories: processing and handling.
Processing encompasses processing of outgoing messages and processing of incoming messages. Loop coalescing is an example of processing of incoming messages.
Handling encompasses directing messages to those handlers who want to receive them. For example, the subscriber pattern tied to a classes Type, which allows a handler to receive only those messages of a certain Type. Or a filtering mechanism based on a predicate on the message.
A very useful metaphor for processing is a stack of service layers that provide processing functionality. You can think of outgoing messages as going down the stack, and incoming messages as coming up the stack. An outgoing message is not sent until it reaches the bottom of the stack, and an incoming message is not processed until it reaches the top of the stack.
With some notion of stack, the loop method would pull messages from the stack, which would cause all the service layers to process the incoming messages. The send methods would push messages through the stack, causing all the service layers to process the outgoing messages. A MessageCoalescing service layer would only process incoming messages and would not touch outgoing messages, but in general most service layers do both.
The stack would have methods to install and remove service layers (ordering is important). So to use coalescing, a user might do this:
As for handling messages, it's possible to use the stack for this, too, because some higher level services can take incoming messages and send them to their recipients, never sending them to higher levels (so loop never sees them). However, I usually directly incorporate the notions of filtering by Type and a predicate on the message. The first one offers very high performance and is usually all that's necessary, if the message class hierarchy is well designed. The second one allows filtering based on arbitrary conditions, and it's necessarily slower but still ocassionally useful.
brianSun 22 Feb 2009
That is basically the same direction I was thinking. The problem is how to expose the ability to plug functionality into the message processing stack, when Fan itself doesn't have mutex locking (which we avoid since we push it into the thread's queue).
If I take your two cases processing and handling, I would probably rename processing to queue management. Handling happens basically once you pull the next message and doesn't require locking - so we should be able to build all sorts of handling frameworks and utilities in 100% Fan.
The ideal scenario is that you just plug in your custom queue management too. Then you could handle coalescing, priorities, etc. But you can't build suitable queue management without lower-level concurrency primitives, which I don't want to build into Fan.
So that is the basic problem I am struggling with.
JohnDGSun 22 Feb 2009
The problem is how to expose the ability to plug functionality into the message processing stack,
I was thinking that each thread would have its own stack, rather than a global stack. This is more useful, anyway, since it allows each thread to install only the service layers that it requires.
Basically the service stack/queue management would sit above the actual code that pulls in messages from other threads (and pushes out messages to other threads).
If you like I can send you some API doc for Una Connect, which has a fully-fleshed out implementation of these ideas -- albeit for quite a different context. It's a 4th generation messaging framework (and the best design so far), and has too many features for Fan, but a simpler subset would fit well.
brianSun 22 Feb 2009
Basically the service stack/queue management would sit above the actual code that pulls in messages from other threads (and pushes out messages to other threads).
Ideally the Fan thread API is minimal with just the correct plug points. I don't want to prescribe how message management gets built on top of that (although eventually it would be nice to have a standard Fan framework for that stuff). But sys::Thread should only provide the low level functionality and hooks. I think that is what you are saying too.
Although my problem is I don't know how to do a queue management hook which makes sense in the context of the low level locking and handling I am doing in the Java and C# code. Hence, Thread.loopCoalescing. Although I've done a lot real-time protocols and messaging too, and based on my experience msg coalescing is used extensively, so its probably a good candidate for being built in anyhow.
If you like I can send you some API doc for Una Connect, which has a fully-fleshed out implementation of these ideas
That would be great, you can email me.
Stop vs Kill
Here is another change I've pushed to hg. The common pattern I tend to use over and over is to send a "stop" message to a Thread to allow it to finish processing all its pending messages.
Previously I had a Thread.stop method which immediately raised InterruptedErr on the thread and all the threads blocking on sendSync. Having that method was a bit of the anti-pattern.
So instead, I've renamed Thread.stop to Thread.kill. Kill works the same, it sets the thread into the dead state and raises InterruptedErr on the thread so that it can exit its run loop.
Then I changed the semanatics of stop to post a built-in stop message to the queue. This lets the thread cleanly finish procesing its queue, then exit its loop and run method gracefully.
If you are into concurrency, any reviews of that code would be great. Concurrency code is one of those things that takes lots of eyeballs on it.
brianSun 22 Feb 2009
One stop-vs-kill concern I have is that I don't currently require a thread to enter its message loop. So the question is what stop should do if the thread hasn't entered its message loop?
Today what I do is check if there are pending messages and just assume that the thread hasn't gotten a chance to enter its loop so I enqueue the stop message, and once the thread gets into its loop it should eventually stop gracefully.
Otherwise if the thread hasn't entered its loop yet and doesn't have any pending messages, then stop works just like kill (raises InterruptedErr).
That design seems reasonable, but there is a window which I don't like:
The problem is that if stop were called after we opened in the file, but before we entered the loop then an interrupted exception would be raised and the file wouldn't be closed.
Not sure what the best way to handle that is.
Edit: this situation probably wouldn't happen that often; it requires a long initialization before entering loop and quickly starting/stopping a thread.
JohnDGSun 22 Feb 2009
But sys::Thread should only provide the low level functionality and hooks. I think that is what you are saying too.
Exactly. I've tried the heavyweight approaches and they don't work well. When you look at Una Connect, you'll see the Mailbox class has only a couple of methods. It's implementation is trivial. Any functionality the client wants goes into service layers ("queue management"). That includes priority schemes and other functions that are usually built into the core (in most messaging frameworks).
I greatly favor "micro-kernel" approaches these days, with extensible architectures.
What's your e-mail, btw?
If you are into concurrency, any reviews of that code would be great.
I've written probably a hundred thousand lines of concurrent code. :-)
I took a quick look and here are some general comments:
There are some missing abstractions, here. In particular, I'd recommend factoring out a concurrent queue. If you're permitted to use JRE 1.5+, then you could use a BlockingQueue. Otherwise, it's pretty simple to create one. In the latter case, you want a separate lock for the head of the queue, and a separate lock for the tail of the queue. With one consumer, you can get away with just one lock for the tail. The consumer will attempt to lock the tail if it examines the head and finds that next is null.
In at least one place, you note that a lock should be held before the method is invoked. You can actually check this with Thread.holdsLock or if appropriate, you can synchronize on the method. The cost of nested synchronized sections is virtually zero in modern JREs.
Many methods are synchronized that do not need to be synchronized. For example:
public final synchronized boolean isNew()
{
return state == NEW;
}
This method is never concurrent-safe because the value of state may change at any instant in time. The only exception is if the calling method is synchronized on a mutex that prevents changes to state, and in this case, you don't need the synchronized block anyway.
state and all variables accessed from more than one thread should be declared volatile to ensure that the JRE doesn't use thread-local caching. This DOES make a difference as recently as JRE 1.5.
notify() should never be used, only notifyAll. You use notifyAll in all places but one instance.
The wait methods are susceptible to spurious wakeup. I don't see any logic in the code to handle this (perhaps it's not important, I haven't looked closely).
InterruptedException is ignored. If you really want to ignore it, you should add some logic to continue waiting inside the loop (though I don't recommend ignoring it, as it will be called in some termination issues and of course if the thread is manually interrupted).
There are only two locks for the whole class, and many methods use one of them. This is very course granularity and will lead to performance bottlenecks when there's a lot of interactions with Thread.
Instead of HashMap, consider ConcurrentHashMap, which offers much higher performance and some utility methods like putIfAbsent that are quite useful for concurrent applications. Again, if you can't use JRE 1.5 concurrency, then it's pretty easy to develop one of this yourself or just use backport's version.
Instead of using integers to represent things like total, and accessing them from inside a lock, consider interfaces like AtomicInteger, which have direct support on hardware and are very fast. They're also very easy to use.
You can simplify a lot of the timing code by using a scheduler, which at the specified moment, will add the item to the tail of the queue. Right now a lot of logic is spent maintaining the array, looping through it, finding the right moment to send the messages, etc., and you can eliminate nearly all of it. By reusing some scheduling mechanism from the JRE/.NET.
I can take a more detailed look when I have more time. Overall impression is that we can make this class a lot smaller, a lot more robust, and much higher-performing with a few key changes.
JohnDGSun 22 Feb 2009
I like the "end of processing" token and have used that extensively for graceful shutdowns of queue processors.
So the question is what stop should do if the thread hasn't entered its message loop?
stop should prevent the thread from entering its message loop, causing loop to return immediately. I don't think exceptions should be thrown.
UNA has something called a StoppableThread, which requires clients to implement a runInnerLoop method. It has very safe semantics. It's legal, for example, to call stop even from the inner loop, or even when the thread hasn't fully started yet, and the most intuitive thing happens.
brianSun 22 Feb 2009
Thanks for reviewing the code John...
There are some missing abstractions, here. In particular, I'd recommend factoring out a concurrent queue. If you're permitted to use JRE 1.5+
When I originally wrote the code, I was designing everything to run on J2ME which is effectively Java 1.3. But I think I might give up on J2ME, in which case I can switch to use java.concurrent. Although this is such performance critical code, that I might still roll my own queue versus having the extra object allocation in the linked list.
In at least one place, you note that a lock should be held before the method is invoked.
Yeah, I am super obsessive about the performance of this code.
This method is never concurrent-safe because the value of state may change at any instant in time.
It has to be synchronized or other threads are not guaranteed to see the updated field. Otherwise the field has be volatile, but for various reasons I stuck to synchronized.
notify() should never be used, only notifyAll. You use notifyAll in all places but one instance.
I think that notify is ok, but you are right probably safer to use notifyAll (I believe Chris found a bug previously related to notify vs notifyAll)
The wait methods are susceptible to spurious wakeup. I don't see any logic in the code to handle this
I originally thought those might be correct, but since I messed up notify vs notifyAll in a couple places I probably have multiple threads I don't have my head around. So I need to go back and investigate. That is a good call.
InterruptedException is ignored
That is by design (I think), InterruptedErr should be raised to all calling threads (although I need to review that myself)
There are only two locks for the whole class
There should be one for the queue with contention b/w consumers and producers, and then a lock for thread state which should rarely have contention. The real bottleneck will be message queue lock (although I think it should perform well, but maybe BlockingQueue does it better).
Instead of HashMap, consider
This is just for coalescing, and I think it has to be wrapped in a higher level lock anyways. But I think that design is just temporary.
Thanks for taking the time to look at it. Concurrency code at this level is insanely hard to get your head around. But I think I've decided to forget about J2ME, in which case I can leverage some the new java.concurrent stuff.
After thinking about this today, I am actually thinking that maybe I should ditch Thread altogether and redesign the API at a higher level. There are couple things I've noticed using this API that seem to scream that it is too low level:
a traditional actors model really needs green threads or thread pooling (hiding the actual thread concept)
the common pattern of declaring all your mutable state inside the run method is a bit awkward and seems like it can be captured easier
right now I am manually handling thread "chains" which probably need something more like Elrang "linked processes"
need a more pluggable mechanism for queue management and dispatch
not sure I like tying service and thread together (it forces a service to expose its low level thread interface to the world)
I am going to give some thought and maybe post a new design later this week.
alexlamslMon 23 Feb 2009
ExecutorService or even ScheduledExecutorService might be quite useful here, IMHO; it is pretty malleable.
JohnDGMon 23 Feb 2009
Yeah, I am super obsessive about the performance of this code.
In these cases, I often just assert it, and when testing, I always have assertions on. For some concurrent issues, it's much more practical to add assertions than create automated tests (though surprisingly, you can test quite a lot of concurrent issues).
It has to be synchronized or other threads are not guaranteed to see the updated field. Otherwise the field has be volatile, but for various reasons I stuck to synchronized.
For variables accessed quite frequently from multiple threads, thread-local caching doesn't make sense, and volatile is a good way to turn it off. In any case, for methods like isNew, etc., if not invoked in the same mutex in which state is allowed to change, the return value from the method is not very meaningful, since it might change even before the method finishes returning.
I think that notify is ok, but you are right probably safer to use notifyAll (I believe Chris found a bug previously related to notify vs notifyAll)
Yes. I've never had good experiences with notify even when it seems semantically safe to use it (i.e. there is a maximum of one other thread waiting on the mutex).
I think it should perform well, but maybe BlockingQueue does it better
Yes. The key observation is that if you have a single consumer, and multiple producers, then if your queue contains both a reference to first and last elements, you really only need a mutex for modifying the last element. When any producer wants to append an element to the queue, it will obtain the mutex and modify the last element's next reference to point to the new element, then reassign last to be the true last element, before relinquishing control of the mutex.
The consumer then uses simple logic to decide whether or not to obtain the last element mutex:
Now you still need to wait until there's an element available. I would use a different mutex here, because this will dramatically reduce lock contention.
Void notifyElemAvailable() {
synchronized (elemAvailableMutex) {
elemAvailableMutex.notifyAll()
}
}
Void waitUntilElemAvailable() {
synchronized (elemAvailableMutex) {
while (true) { // <--- Handle spurious wakeup
try {
elemAvailableMutex.wait()
}
catch (InterruptedException e) {
// You should probably rethrow this and terminate the loop.
// It probably means the app is terminating or someone
// manually tried to interrupt the thread
}
}
}
}
The performance of such an implementation will be much higher than using a single mutex for the whole Thread instance.
I am going to give some thought and maybe post a new design later this week.
Sounds good to me.
JohnDGMon 23 Feb 2009
By the way, I'm wondering if it might make sense to create some sort of FanPrims class, which has primitives for concurrency, locking, shared state, etc., and build some Fan classes atop these. This is the approach used for some compilers such as GHC which have to support multiple platforms.
brianTue 24 Feb 2009
There are two paths, one is to expose the low level things like locking. But that requires allowing shared mutable state in the language. My preference is to stick to the current design using higher level concurrency primitives like message passing and enforce no shared mutable state (although we know have the Unsafe exception).
JohnDGTue 24 Feb 2009
There are two paths, one is to expose the low level things like locking.
And then the third path: FanPrimOps which is exposed only internally; if it's even accessible externally, it's neither documented nor supported (kind of like Unsafe.* on the JRE). Could be useful making it accessible so things like an STM plug-in could be developed.
In any case, I agree with your general idea: it shouldn't be a part of the public API of the language. It should only be used for low-level implementation of parts of Fan itself, and possibly for alternate concurrency models like STM.
The advantage is that you get to code Thread in Fan (one version instead of two), and perhaps many other similar classes, depending on exactly what is bundled into FanPrimOps.
jodastephenTue 24 Feb 2009
A possible fourth option - add a concurrent keyword?
class Foo {
concurrent static Int hitCounter
}
This kind of variable would be like const in that it is allowed in static scope, and is allowed to be shared mutable state.
Internally, its implemented using concurrent classes like AtomicInteger, ConcurrentHashMap, CopyOnWriteList.
The key advantage is an easy learning curve from other languages, esp. Java. You still can have shared static state, but it must be const or concurrent.
I like John's low level classes notion too.
JohnDGTue 24 Feb 2009
The problem with a concurrent keyword that exposes mutable state is that lock-based concurrency (even very nice lock-based concurrency such as AtomicInteger /' ConcurrentHashMap', et al) is not composable. Meaning, that if you have safe semantics for various primitives, then when you combine them, you are not guaranteed for the whole to be safe. In fact, the whole will probably be filled with deadlocks.
is perfectly safe as long as each one is safe. This makes it trivial to create sound systems out of smaller, sound components -- a task exceedingly difficult with lock-based concurrency.
Now using a concurrent keyword to allow mutable state but only within an atomic block, I do like that. :-) But Fan likely won't have such a notion. Instead, we can do it with a compiler plug-in (with the small disadvantage that we have to be able to parse Fan code inside the STM plug-in, but that's OK).
brian Sat 21 Feb 2009
I've been doing some pretty intense concurrency work. I know we've previously talked about some type of message matching as something we need, but that hasn't really been a big problem for me. What has been my primary issue is the notion of coalescing messages to efficiently merge pending work.
I have been debating different strategies to make message passing more "pluggable", but haven't really come up with any good ideas versus just enhancing the core Thread class itself.
I've pushed a change for a new
Thread.loopCoalescing
method.The docs:
Coalescing Messages
Often when sending messages to a thread, we can merge two messages into a single message to save ourselves some work. For example, it is common in windowing systems to maintain a single union of all the dirty portions of a window rather than of a bunch of little rectangles. A thread can have its messages automatically coalesced using the
loopCoalescing
method.Let's look at an example:
In this example the messages are instances of
Repaint
. ThetoKey
function is used to obtain the key which determines if two messages can be coalesced. In this example we coalesce repaints per window. If the thread detects two pending messages with the same key (the window in this case), then it calls thecoalesce
function to merge the messages. In example we return a newRepaint
event with the union of the two dirty regions.If you have feedback, please share.
JohnDG Sun 22 Feb 2009
This is a useful feature, but there is a more general pattern here.
I've written quite a few large-scale messaging frameworks, and the patterns I see fall into two categories: processing and handling.
Processing encompasses processing of outgoing messages and processing of incoming messages. Loop coalescing is an example of processing of incoming messages.
Handling encompasses directing messages to those handlers who want to receive them. For example, the subscriber pattern tied to a classes
Type
, which allows a handler to receive only those messages of a certainType
. Or a filtering mechanism based on a predicate on the message.A very useful metaphor for processing is a stack of service layers that provide processing functionality. You can think of outgoing messages as going down the stack, and incoming messages as coming up the stack. An outgoing message is not sent until it reaches the bottom of the stack, and an incoming message is not processed until it reaches the top of the stack.
With some notion of stack, the
loop
method would pull messages from the stack, which would cause all the service layers to process the incoming messages. Thesend
methods would push messages through the stack, causing all the service layers to process the outgoing messages. AMessageCoalescing
service layer would only process incoming messages and would not touch outgoing messages, but in general most service layers do both.The stack would have methods to install and remove service layers (ordering is important). So to use coalescing, a user might do this:
As for handling messages, it's possible to use the stack for this, too, because some higher level services can take incoming messages and send them to their recipients, never sending them to higher levels (so
loop
never sees them). However, I usually directly incorporate the notions of filtering byType
and a predicate on the message. The first one offers very high performance and is usually all that's necessary, if the message class hierarchy is well designed. The second one allows filtering based on arbitrary conditions, and it's necessarily slower but still ocassionally useful.brian Sun 22 Feb 2009
That is basically the same direction I was thinking. The problem is how to expose the ability to plug functionality into the message processing stack, when Fan itself doesn't have mutex locking (which we avoid since we push it into the thread's queue).
If I take your two cases processing and handling, I would probably rename processing to queue management. Handling happens basically once you pull the next message and doesn't require locking - so we should be able to build all sorts of handling frameworks and utilities in 100% Fan.
The ideal scenario is that you just plug in your custom queue management too. Then you could handle coalescing, priorities, etc. But you can't build suitable queue management without lower-level concurrency primitives, which I don't want to build into Fan.
So that is the basic problem I am struggling with.
JohnDG Sun 22 Feb 2009
I was thinking that each thread would have its own stack, rather than a global stack. This is more useful, anyway, since it allows each thread to install only the service layers that it requires.
Basically the service stack/queue management would sit above the actual code that pulls in messages from other threads (and pushes out messages to other threads).
If you like I can send you some API doc for Una Connect, which has a fully-fleshed out implementation of these ideas -- albeit for quite a different context. It's a 4th generation messaging framework (and the best design so far), and has too many features for Fan, but a simpler subset would fit well.
brian Sun 22 Feb 2009
Ideally the Fan thread API is minimal with just the correct plug points. I don't want to prescribe how message management gets built on top of that (although eventually it would be nice to have a standard Fan framework for that stuff). But
sys::Thread
should only provide the low level functionality and hooks. I think that is what you are saying too.Although my problem is I don't know how to do a queue management hook which makes sense in the context of the low level locking and handling I am doing in the Java and C# code. Hence,
Thread.loopCoalescing
. Although I've done a lot real-time protocols and messaging too, and based on my experience msg coalescing is used extensively, so its probably a good candidate for being built in anyhow.That would be great, you can email me.
Stop vs Kill
Here is another change I've pushed to hg. The common pattern I tend to use over and over is to send a "stop" message to a Thread to allow it to finish processing all its pending messages.
Previously I had a
Thread.stop
method which immediately raised InterruptedErr on the thread and all the threads blocking on sendSync. Having that method was a bit of the anti-pattern.So instead, I've renamed
Thread.stop
toThread.kill
. Kill works the same, it sets the thread into the dead state and raises InterruptedErr on the thread so that it can exit its run loop.Then I changed the semanatics of
stop
to post a built-in stop message to the queue. This lets the thread cleanly finish procesing its queue, then exit its loop and run method gracefully.If you are into concurrency, any reviews of that code would be great. Concurrency code is one of those things that takes lots of eyeballs on it.
brian Sun 22 Feb 2009
One stop-vs-kill concern I have is that I don't currently require a thread to enter its message loop. So the question is what
stop
should do if the thread hasn't entered its message loop?Today what I do is check if there are pending messages and just assume that the thread hasn't gotten a chance to enter its loop so I enqueue the stop message, and once the thread gets into its loop it should eventually stop gracefully.
Otherwise if the thread hasn't entered its loop yet and doesn't have any pending messages, then stop works just like kill (raises InterruptedErr).
That design seems reasonable, but there is a window which I don't like:
The problem is that if stop were called after we opened in the file, but before we entered the loop then an interrupted exception would be raised and the file wouldn't be closed.
Not sure what the best way to handle that is.
Edit: this situation probably wouldn't happen that often; it requires a long initialization before entering loop and quickly starting/stopping a thread.
JohnDG Sun 22 Feb 2009
Exactly. I've tried the heavyweight approaches and they don't work well. When you look at
Una Connect
, you'll see theMailbox
class has only a couple of methods. It's implementation is trivial. Any functionality the client wants goes into service layers ("queue management"). That includes priority schemes and other functions that are usually built into the core (in most messaging frameworks).I greatly favor "micro-kernel" approaches these days, with extensible architectures.
What's your e-mail, btw?
I've written probably a hundred thousand lines of concurrent code. :-)
I took a quick look and here are some general comments:
BlockingQueue
. Otherwise, it's pretty simple to create one. In the latter case, you want a separate lock for the head of the queue, and a separate lock for the tail of the queue. With one consumer, you can get away with just one lock for the tail. The consumer will attempt to lock the tail if it examines thehead
and finds thatnext
is null.Thread.holdsLock
or if appropriate, you cansynchronize
on the method. The cost of nestedsynchronized
sections is virtually zero in modern JREs.This method is never concurrent-safe because the value of
state
may change at any instant in time. The only exception is if the calling method is synchronized on a mutex that prevents changes tostate
, and in this case, you don't need thesynchronized
block anyway.state
and all variables accessed from more than one thread should be declaredvolatile
to ensure that the JRE doesn't use thread-local caching. This DOES make a difference as recently as JRE 1.5.notify()
should never be used, onlynotifyAll
. You usenotifyAll
in all places but one instance.wait
methods are susceptible to spurious wakeup. I don't see any logic in the code to handle this (perhaps it's not important, I haven't looked closely).InterruptedException
is ignored. If you really want to ignore it, you should add some logic to continue waiting inside the loop (though I don't recommend ignoring it, as it will be called in some termination issues and of course if the thread is manually interrupted).Thread
.HashMap
, considerConcurrentHashMap
, which offers much higher performance and some utility methods likeputIfAbsent
that are quite useful for concurrent applications. Again, if you can't use JRE 1.5 concurrency, then it's pretty easy to develop one of this yourself or just use backport's version.total
, and accessing them from inside a lock, consider interfaces likeAtomicInteger
, which have direct support on hardware and are very fast. They're also very easy to use.I can take a more detailed look when I have more time. Overall impression is that we can make this class a lot smaller, a lot more robust, and much higher-performing with a few key changes.
JohnDG Sun 22 Feb 2009
I like the "end of processing" token and have used that extensively for graceful shutdowns of queue processors.
stop
should prevent the thread from entering its message loop, causingloop
to return immediately. I don't think exceptions should be thrown.UNA has something called a
StoppableThread
, which requires clients to implement arunInnerLoop
method. It has very safe semantics. It's legal, for example, to callstop
even from the inner loop, or even when the thread hasn't fully started yet, and the most intuitive thing happens.brian Sun 22 Feb 2009
Thanks for reviewing the code John...
When I originally wrote the code, I was designing everything to run on J2ME which is effectively Java 1.3. But I think I might give up on J2ME, in which case I can switch to use java.concurrent. Although this is such performance critical code, that I might still roll my own queue versus having the extra object allocation in the linked list.
Yeah, I am super obsessive about the performance of this code.
It has to be synchronized or other threads are not guaranteed to see the updated field. Otherwise the field has be volatile, but for various reasons I stuck to synchronized.
I think that notify is ok, but you are right probably safer to use notifyAll (I believe Chris found a bug previously related to notify vs notifyAll)
I originally thought those might be correct, but since I messed up notify vs notifyAll in a couple places I probably have multiple threads I don't have my head around. So I need to go back and investigate. That is a good call.
That is by design (I think), InterruptedErr should be raised to all calling threads (although I need to review that myself)
There should be one for the queue with contention b/w consumers and producers, and then a lock for thread state which should rarely have contention. The real bottleneck will be message queue lock (although I think it should perform well, but maybe BlockingQueue does it better).
This is just for coalescing, and I think it has to be wrapped in a higher level lock anyways. But I think that design is just temporary.
Thanks for taking the time to look at it. Concurrency code at this level is insanely hard to get your head around. But I think I've decided to forget about J2ME, in which case I can leverage some the new java.concurrent stuff.
After thinking about this today, I am actually thinking that maybe I should ditch Thread altogether and redesign the API at a higher level. There are couple things I've noticed using this API that seem to scream that it is too low level:
I am going to give some thought and maybe post a new design later this week.
alexlamsl Mon 23 Feb 2009
ExecutorService
or evenScheduledExecutorService
might be quite useful here, IMHO; it is pretty malleable.JohnDG Mon 23 Feb 2009
In these cases, I often just
assert
it, and when testing, I always have assertions on. For some concurrent issues, it's much more practical to add assertions than create automated tests (though surprisingly, you can test quite a lot of concurrent issues).For variables accessed quite frequently from multiple threads, thread-local caching doesn't make sense, and
volatile
is a good way to turn it off. In any case, for methods likeisNew
, etc., if not invoked in the same mutex in whichstate
is allowed to change, the return value from the method is not very meaningful, since it might change even before the method finishes returning.Yes. I've never had good experiences with
notify
even when it seems semantically safe to use it (i.e. there is a maximum of one other thread waiting on the mutex).Yes. The key observation is that if you have a single consumer, and multiple producers, then if your queue contains both a reference to
first
andlast
elements, you really only need a mutex for modifying thelast
element. When any producer wants to append an element to the queue, it will obtain the mutex and modify thelast
element'snext
reference to point to the new element, then reassignlast
to be the true last element, before relinquishing control of the mutex.The consumer then uses simple logic to decide whether or not to obtain the last element mutex:
Now you still need to wait until there's an element available. I would use a different mutex here, because this will dramatically reduce lock contention.
The performance of such an implementation will be much higher than using a single mutex for the whole
Thread
instance.Sounds good to me.
JohnDG Mon 23 Feb 2009
By the way, I'm wondering if it might make sense to create some sort of
FanPrims
class, which has primitives for concurrency, locking, shared state, etc., and build some Fan classes atop these. This is the approach used for some compilers such as GHC which have to support multiple platforms.brian Tue 24 Feb 2009
There are two paths, one is to expose the low level things like locking. But that requires allowing shared mutable state in the language. My preference is to stick to the current design using higher level concurrency primitives like message passing and enforce no shared mutable state (although we know have the Unsafe exception).
JohnDG Tue 24 Feb 2009
And then the third path:
FanPrimOps
which is exposed only internally; if it's even accessible externally, it's neither documented nor supported (kind of likeUnsafe
.* on the JRE). Could be useful making it accessible so things like an STM plug-in could be developed.In any case, I agree with your general idea: it shouldn't be a part of the public API of the language. It should only be used for low-level implementation of parts of Fan itself, and possibly for alternate concurrency models like STM.
The advantage is that you get to code
Thread
in Fan (one version instead of two), and perhaps many other similar classes, depending on exactly what is bundled intoFanPrimOps
.jodastephen Tue 24 Feb 2009
A possible fourth option - add a concurrent keyword?
This kind of variable would be like const in that it is allowed in static scope, and is allowed to be shared mutable state.
Internally, its implemented using concurrent classes like AtomicInteger, ConcurrentHashMap, CopyOnWriteList.
The key advantage is an easy learning curve from other languages, esp. Java. You still can have shared static state, but it must be const or concurrent.
I like John's low level classes notion too.
JohnDG Tue 24 Feb 2009
The problem with a
concurrent
keyword that exposes mutable state is that lock-based concurrency (even very nice lock-based concurrency such asAtomicInteger
/' ConcurrentHashMap', et al) is not composable. Meaning, that if you have safe semantics for various primitives, then when you combine them, you are not guaranteed for the whole to be safe. In fact, the whole will probably be filled with deadlocks.STM provides safe semantics. Meaning:
is perfectly safe as long as each one is safe. This makes it trivial to create sound systems out of smaller, sound components -- a task exceedingly difficult with lock-based concurrency.
Now using a
concurrent
keyword to allow mutable state but only within anatomic
block, I do like that. :-) But Fan likely won't have such a notion. Instead, we can do it with a compiler plug-in (with the small disadvantage that we have to be able to parse Fan code inside the STM plug-in, but that's OK).