#473 Threading Changes

brian Sat 21 Feb 2009

I've been doing some pretty intense concurrency work. I know we've previously talked about some type of message matching as something we need, but that hasn't really been a big problem for me. What has been my primary issue is the notion of coalescing messages to efficiently merge pending work.

I have been debating different strategies to make message passing more "pluggable", but haven't really come up with any good ideas versus just enhancing the core Thread class itself.

I've pushed a change for a new Thread.loopCoalescing method.

The docs:

**
** Enter a coalescing message loop.  This method follows the same
** semantics as `loop`, but has the ability to coalesce the messages
** pending in the thread's message queue.
**
** The 'toKey' function is used to derive a key for each message,
** or if null then the message itself is used as the key.  If the 'toKey'
** function returns null, then the message is not considered for coalescing.
** Internally messages are indexed by key for efficient coalescing.
**
** If an incoming message has the same key as a pending message
** in the queue, then the 'coalesce' function is called to coalesce
** the messages into a new merged message.  If 'coalesce' is null,
** then we use the original message.  The coalesced message occupies
** the same position in the queue as the original and the incoming
** message is discarded.
**
** Both the 'toKey' and 'coalesce' functions are called while holding
** an internal lock on the queue.  So the functions must be efficient
** and never attempt to interact with other threads.
**
** See [docLang]`docLang::Threading#coalescing` for more information.
**
Void loopCoalescing(|Obj? msg->Obj?|? toKey,
                    |Obj? orig, Obj? incoming->Obj?|? coalesce,
                    |Obj? msg->Obj?| receive)

Coalescing Messages

Often when sending messages to a thread, we can merge two messages into a single message to save ourselves some work. For example, it is common in windowing systems to maintain a single union of all the dirty portions of a window rather than of a bunch of little rectangles. A thread can have its messages automatically coalesced using the loopCoalescing method.

Let's look at an example:

class Repaint
{
  new make(Window w, Rect d) { ... }
  Window window
  Rect dirty
}

toKey := |Repaint msg->Obj| { msg.window }
coalesce := |Repaint a, Repaint b->Obj| { Repaint(a.window, a.dirty.union(b.dirty)) }
loopCoalescing(toKey, coalesce) |Repaint msg| {...}

In this example the messages are instances of Repaint. The toKey function is used to obtain the key which determines if two messages can be coalesced. In this example we coalesce repaints per window. If the thread detects two pending messages with the same key (the window in this case), then it calls the coalesce function to merge the messages. In example we return a new Repaint event with the union of the two dirty regions.

If you have feedback, please share.

JohnDG Sun 22 Feb 2009

This is a useful feature, but there is a more general pattern here.

I've written quite a few large-scale messaging frameworks, and the patterns I see fall into two categories: processing and handling.

Processing encompasses processing of outgoing messages and processing of incoming messages. Loop coalescing is an example of processing of incoming messages.

Handling encompasses directing messages to those handlers who want to receive them. For example, the subscriber pattern tied to a classes Type, which allows a handler to receive only those messages of a certain Type. Or a filtering mechanism based on a predicate on the message.

A very useful metaphor for processing is a stack of service layers that provide processing functionality. You can think of outgoing messages as going down the stack, and incoming messages as coming up the stack. An outgoing message is not sent until it reaches the bottom of the stack, and an incoming message is not processed until it reaches the top of the stack.

With some notion of stack, the loop method would pull messages from the stack, which would cause all the service layers to process the incoming messages. The send methods would push messages through the stack, causing all the service layers to process the outgoing messages. A MessageCoalescing service layer would only process incoming messages and would not touch outgoing messages, but in general most service layers do both.

The stack would have methods to install and remove service layers (ordering is important). So to use coalescing, a user might do this:

installService(new MessageCoalescing(toKey, coalesce))

As for handling messages, it's possible to use the stack for this, too, because some higher level services can take incoming messages and send them to their recipients, never sending them to higher levels (so loop never sees them). However, I usually directly incorporate the notions of filtering by Type and a predicate on the message. The first one offers very high performance and is usually all that's necessary, if the message class hierarchy is well designed. The second one allows filtering based on arbitrary conditions, and it's necessarily slower but still ocassionally useful.

brian Sun 22 Feb 2009

That is basically the same direction I was thinking. The problem is how to expose the ability to plug functionality into the message processing stack, when Fan itself doesn't have mutex locking (which we avoid since we push it into the thread's queue).

If I take your two cases processing and handling, I would probably rename processing to queue management. Handling happens basically once you pull the next message and doesn't require locking - so we should be able to build all sorts of handling frameworks and utilities in 100% Fan.

The ideal scenario is that you just plug in your custom queue management too. Then you could handle coalescing, priorities, etc. But you can't build suitable queue management without lower-level concurrency primitives, which I don't want to build into Fan.

So that is the basic problem I am struggling with.

JohnDG Sun 22 Feb 2009

The problem is how to expose the ability to plug functionality into the message processing stack,

I was thinking that each thread would have its own stack, rather than a global stack. This is more useful, anyway, since it allows each thread to install only the service layers that it requires.

Basically the service stack/queue management would sit above the actual code that pulls in messages from other threads (and pushes out messages to other threads).

If you like I can send you some API doc for Una Connect, which has a fully-fleshed out implementation of these ideas -- albeit for quite a different context. It's a 4th generation messaging framework (and the best design so far), and has too many features for Fan, but a simpler subset would fit well.

brian Sun 22 Feb 2009

Basically the service stack/queue management would sit above the actual code that pulls in messages from other threads (and pushes out messages to other threads).

Ideally the Fan thread API is minimal with just the correct plug points. I don't want to prescribe how message management gets built on top of that (although eventually it would be nice to have a standard Fan framework for that stuff). But sys::Thread should only provide the low level functionality and hooks. I think that is what you are saying too.

Although my problem is I don't know how to do a queue management hook which makes sense in the context of the low level locking and handling I am doing in the Java and C# code. Hence, Thread.loopCoalescing. Although I've done a lot real-time protocols and messaging too, and based on my experience msg coalescing is used extensively, so its probably a good candidate for being built in anyhow.

If you like I can send you some API doc for Una Connect, which has a fully-fleshed out implementation of these ideas

That would be great, you can email me.

Stop vs Kill

Here is another change I've pushed to hg. The common pattern I tend to use over and over is to send a "stop" message to a Thread to allow it to finish processing all its pending messages.

Previously I had a Thread.stop method which immediately raised InterruptedErr on the thread and all the threads blocking on sendSync. Having that method was a bit of the anti-pattern.

So instead, I've renamed Thread.stop to Thread.kill. Kill works the same, it sets the thread into the dead state and raises InterruptedErr on the thread so that it can exit its run loop.

Then I changed the semanatics of stop to post a built-in stop message to the queue. This lets the thread cleanly finish procesing its queue, then exit its loop and run method gracefully.

If you are into concurrency, any reviews of that code would be great. Concurrency code is one of those things that takes lots of eyeballs on it.

brian Sun 22 Feb 2009

One stop-vs-kill concern I have is that I don't currently require a thread to enter its message loop. So the question is what stop should do if the thread hasn't entered its message loop?

Today what I do is check if there are pending messages and just assume that the thread hasn't gotten a chance to enter its loop so I enqueue the stop message, and once the thread gets into its loop it should eventually stop gracefully.

Otherwise if the thread hasn't entered its loop yet and doesn't have any pending messages, then stop works just like kill (raises InterruptedErr).

That design seems reasonable, but there is a window which I don't like:

Void run()
{
  out := File.open
  loop |Obj msg| { ... }
  out.close
}

The problem is that if stop were called after we opened in the file, but before we entered the loop then an interrupted exception would be raised and the file wouldn't be closed.

Not sure what the best way to handle that is.

Edit: this situation probably wouldn't happen that often; it requires a long initialization before entering loop and quickly starting/stopping a thread.

JohnDG Sun 22 Feb 2009

But sys::Thread should only provide the low level functionality and hooks. I think that is what you are saying too.

Exactly. I've tried the heavyweight approaches and they don't work well. When you look at Una Connect, you'll see the Mailbox class has only a couple of methods. It's implementation is trivial. Any functionality the client wants goes into service layers ("queue management"). That includes priority schemes and other functions that are usually built into the core (in most messaging frameworks).

I greatly favor "micro-kernel" approaches these days, with extensible architectures.

What's your e-mail, btw?

If you are into concurrency, any reviews of that code would be great.

I've written probably a hundred thousand lines of concurrent code. :-)

I took a quick look and here are some general comments:

  1. There are some missing abstractions, here. In particular, I'd recommend factoring out a concurrent queue. If you're permitted to use JRE 1.5+, then you could use a BlockingQueue. Otherwise, it's pretty simple to create one. In the latter case, you want a separate lock for the head of the queue, and a separate lock for the tail of the queue. With one consumer, you can get away with just one lock for the tail. The consumer will attempt to lock the tail if it examines the head and finds that next is null.
  2. In at least one place, you note that a lock should be held before the method is invoked. You can actually check this with Thread.holdsLock or if appropriate, you can synchronize on the method. The cost of nested synchronized sections is virtually zero in modern JREs.
  3. Many methods are synchronized that do not need to be synchronized. For example:
public final synchronized boolean isNew()
{
   return state == NEW;
}

This method is never concurrent-safe because the value of state may change at any instant in time. The only exception is if the calling method is synchronized on a mutex that prevents changes to state, and in this case, you don't need the synchronized block anyway.

state and all variables accessed from more than one thread should be declared volatile to ensure that the JRE doesn't use thread-local caching. This DOES make a difference as recently as JRE 1.5.

  1. notify() should never be used, only notifyAll. You use notifyAll in all places but one instance.
  2. The wait methods are susceptible to spurious wakeup. I don't see any logic in the code to handle this (perhaps it's not important, I haven't looked closely).
  3. InterruptedException is ignored. If you really want to ignore it, you should add some logic to continue waiting inside the loop (though I don't recommend ignoring it, as it will be called in some termination issues and of course if the thread is manually interrupted).
  4. There are only two locks for the whole class, and many methods use one of them. This is very course granularity and will lead to performance bottlenecks when there's a lot of interactions with Thread.
  5. Instead of HashMap, consider ConcurrentHashMap, which offers much higher performance and some utility methods like putIfAbsent that are quite useful for concurrent applications. Again, if you can't use JRE 1.5 concurrency, then it's pretty easy to develop one of this yourself or just use backport's version.
  6. Instead of using integers to represent things like total, and accessing them from inside a lock, consider interfaces like AtomicInteger, which have direct support on hardware and are very fast. They're also very easy to use.
  7. You can simplify a lot of the timing code by using a scheduler, which at the specified moment, will add the item to the tail of the queue. Right now a lot of logic is spent maintaining the array, looping through it, finding the right moment to send the messages, etc., and you can eliminate nearly all of it. By reusing some scheduling mechanism from the JRE/.NET.

I can take a more detailed look when I have more time. Overall impression is that we can make this class a lot smaller, a lot more robust, and much higher-performing with a few key changes.

JohnDG Sun 22 Feb 2009

I like the "end of processing" token and have used that extensively for graceful shutdowns of queue processors.

So the question is what stop should do if the thread hasn't entered its message loop?

stop should prevent the thread from entering its message loop, causing loop to return immediately. I don't think exceptions should be thrown.

UNA has something called a StoppableThread, which requires clients to implement a runInnerLoop method. It has very safe semantics. It's legal, for example, to call stop even from the inner loop, or even when the thread hasn't fully started yet, and the most intuitive thing happens.

brian Sun 22 Feb 2009

Thanks for reviewing the code John...

There are some missing abstractions, here. In particular, I'd recommend factoring out a concurrent queue. If you're permitted to use JRE 1.5+

When I originally wrote the code, I was designing everything to run on J2ME which is effectively Java 1.3. But I think I might give up on J2ME, in which case I can switch to use java.concurrent. Although this is such performance critical code, that I might still roll my own queue versus having the extra object allocation in the linked list.

In at least one place, you note that a lock should be held before the method is invoked.

Yeah, I am super obsessive about the performance of this code.

This method is never concurrent-safe because the value of state may change at any instant in time.

It has to be synchronized or other threads are not guaranteed to see the updated field. Otherwise the field has be volatile, but for various reasons I stuck to synchronized.

notify() should never be used, only notifyAll. You use notifyAll in all places but one instance.

I think that notify is ok, but you are right probably safer to use notifyAll (I believe Chris found a bug previously related to notify vs notifyAll)

The wait methods are susceptible to spurious wakeup. I don't see any logic in the code to handle this

I originally thought those might be correct, but since I messed up notify vs notifyAll in a couple places I probably have multiple threads I don't have my head around. So I need to go back and investigate. That is a good call.

InterruptedException is ignored

That is by design (I think), InterruptedErr should be raised to all calling threads (although I need to review that myself)

There are only two locks for the whole class

There should be one for the queue with contention b/w consumers and producers, and then a lock for thread state which should rarely have contention. The real bottleneck will be message queue lock (although I think it should perform well, but maybe BlockingQueue does it better).

Instead of HashMap, consider

This is just for coalescing, and I think it has to be wrapped in a higher level lock anyways. But I think that design is just temporary.

Thanks for taking the time to look at it. Concurrency code at this level is insanely hard to get your head around. But I think I've decided to forget about J2ME, in which case I can leverage some the new java.concurrent stuff.

After thinking about this today, I am actually thinking that maybe I should ditch Thread altogether and redesign the API at a higher level. There are couple things I've noticed using this API that seem to scream that it is too low level:

  • a traditional actors model really needs green threads or thread pooling (hiding the actual thread concept)
  • the common pattern of declaring all your mutable state inside the run method is a bit awkward and seems like it can be captured easier
  • right now I am manually handling thread "chains" which probably need something more like Elrang "linked processes"
  • need a more pluggable mechanism for queue management and dispatch
  • not sure I like tying service and thread together (it forces a service to expose its low level thread interface to the world)

I am going to give some thought and maybe post a new design later this week.

alexlamsl Mon 23 Feb 2009

ExecutorService or even ScheduledExecutorService might be quite useful here, IMHO; it is pretty malleable.

JohnDG Mon 23 Feb 2009

Yeah, I am super obsessive about the performance of this code.

In these cases, I often just assert it, and when testing, I always have assertions on. For some concurrent issues, it's much more practical to add assertions than create automated tests (though surprisingly, you can test quite a lot of concurrent issues).

It has to be synchronized or other threads are not guaranteed to see the updated field. Otherwise the field has be volatile, but for various reasons I stuck to synchronized.

For variables accessed quite frequently from multiple threads, thread-local caching doesn't make sense, and volatile is a good way to turn it off. In any case, for methods like isNew, etc., if not invoked in the same mutex in which state is allowed to change, the return value from the method is not very meaningful, since it might change even before the method finishes returning.

I think that notify is ok, but you are right probably safer to use notifyAll (I believe Chris found a bug previously related to notify vs notifyAll)

Yes. I've never had good experiences with notify even when it seems semantically safe to use it (i.e. there is a maximum of one other thread waiting on the mutex).

I think it should perform well, but maybe BlockingQueue does it better

Yes. The key observation is that if you have a single consumer, and multiple producers, then if your queue contains both a reference to first and last elements, you really only need a mutex for modifying the last element. When any producer wants to append an element to the queue, it will obtain the mutex and modify the last element's next reference to point to the new element, then reassign last to be the true last element, before relinquishing control of the mutex.

The consumer then uses simple logic to decide whether or not to obtain the last element mutex:

if (first.next == null) {
    synchronized (lastElementMutex) {
        if (first.next == null) {
             Obj elem = removeFirst()

             return elem
        }
    }
}

return removeFirst()

Now you still need to wait until there's an element available. I would use a different mutex here, because this will dramatically reduce lock contention.

Void notifyElemAvailable() {
    synchronized (elemAvailableMutex) {
        elemAvailableMutex.notifyAll()
    }
}

Void waitUntilElemAvailable() {
    synchronized (elemAvailableMutex) {
        while (true) { // <--- Handle spurious wakeup
            try {
                elemAvailableMutex.wait()
            }
            catch (InterruptedException e) {
                // You should probably rethrow this and terminate the loop.
                // It probably means the app is terminating or someone
                // manually tried to interrupt the thread
            }
        }
    }
}

The performance of such an implementation will be much higher than using a single mutex for the whole Thread instance.

I am going to give some thought and maybe post a new design later this week.

Sounds good to me.

JohnDG Mon 23 Feb 2009

By the way, I'm wondering if it might make sense to create some sort of FanPrims class, which has primitives for concurrency, locking, shared state, etc., and build some Fan classes atop these. This is the approach used for some compilers such as GHC which have to support multiple platforms.

brian Tue 24 Feb 2009

There are two paths, one is to expose the low level things like locking. But that requires allowing shared mutable state in the language. My preference is to stick to the current design using higher level concurrency primitives like message passing and enforce no shared mutable state (although we know have the Unsafe exception).

JohnDG Tue 24 Feb 2009

There are two paths, one is to expose the low level things like locking.

And then the third path: FanPrimOps which is exposed only internally; if it's even accessible externally, it's neither documented nor supported (kind of like Unsafe.* on the JRE). Could be useful making it accessible so things like an STM plug-in could be developed.

In any case, I agree with your general idea: it shouldn't be a part of the public API of the language. It should only be used for low-level implementation of parts of Fan itself, and possibly for alternate concurrency models like STM.

The advantage is that you get to code Thread in Fan (one version instead of two), and perhaps many other similar classes, depending on exactly what is bundled into FanPrimOps.

jodastephen Tue 24 Feb 2009

A possible fourth option - add a concurrent keyword?

class Foo {
  concurrent static Int hitCounter
}

This kind of variable would be like const in that it is allowed in static scope, and is allowed to be shared mutable state.

Internally, its implemented using concurrent classes like AtomicInteger, ConcurrentHashMap, CopyOnWriteList.

The key advantage is an easy learning curve from other languages, esp. Java. You still can have shared static state, but it must be const or concurrent.

I like John's low level classes notion too.

JohnDG Tue 24 Feb 2009

The problem with a concurrent keyword that exposes mutable state is that lock-based concurrency (even very nice lock-based concurrency such as AtomicInteger /' ConcurrentHashMap', et al) is not composable. Meaning, that if you have safe semantics for various primitives, then when you combine them, you are not guaranteed for the whole to be safe. In fact, the whole will probably be filled with deadlocks.

STM provides safe semantics. Meaning:

atomic {
   ...
   atomic {
      atomic {
      ...
      }
   }
   ...
   atomic {
   }
   ...
}

is perfectly safe as long as each one is safe. This makes it trivial to create sound systems out of smaller, sound components -- a task exceedingly difficult with lock-based concurrency.

Now using a concurrent keyword to allow mutable state but only within an atomic block, I do like that. :-) But Fan likely won't have such a notion. Instead, we can do it with a compiler plug-in (with the small disadvantage that we have to be able to parse Fan code inside the STM plug-in, but that's OK).

Login or Signup to reply.