#476 Proposal: New Actor API

brian Tue 24 Feb 2009

This is a proposal to replace the existing sys::Thread API with new set of APIs which model concurrency at a higher level of abstraction. Today Thread encapsulates several concepts into one class:

  1. Well known services: to be replaced with Service
  2. Threading and thread pooling: to be replaced with ActorService
  3. Message passing: to be replaced with Actor
  4. Contextual "globals" such as Locale.current: to be replaced with Context

Service

Today the Thread class defines the mechanism to lookup threads by name. More importantly it allows lookup by type. This is the preferred way to plug in loosely coupled services:

sql := Thread.findService(SqlService#)

The concepts of services and concurrency really need to be split. There will be a new first class API for modeling services:

abstract mixin Service
{
  static Service[] list()
  static Service? find(Type t, Bool checked := true)
  This install()
  This uninstall()
}

Actor

Today the only way to implement an "actor" with its own message queue is via Thread. I originally choose this design because it makes it easy to your head around preemptive concurrency and resuming you left off. But, the huge downside that requires a dedicated Thread which is too heavy weight to implement a traditional actor model where you might have 100,000s actors (my experience is a VM tops out around 5K threads depending on how you configure JVM settings).

So we want to separate the concept of an entity which asynchronously processes messages from the notion of OS threads. The new Actor class models an object which asynchronously processes a message queue:

const abstract class Actor
{
  new make(ActorService s)

  ActorService service()

  Future send(Obj? msg)
  Future schedule(Duration d, Obj? msg)

  protected abstract Obj? receive(Context cx, Obj? msg)
  protected virtual Void onStart(Context cx) {}
  protected virtual Void onStop(Context cx) {}
}

class Future
{
  Obj? get(Duration? timeout := null)
  Bool isDone()
  Bool isCancelled()
  Bool cancel()
}

Couple big changes to note in existing Thread API:

  • get rid of sendAsync vs sendSync, and just always return a Future that you can choose to ignore or block on (this was something that I've been meaning to do anyways)
  • Future API is pretty much exactly like Java's concurrent API
  • note that Actors are always created in context of an ActorService (which I will talk about down below)

Actors are themselves const and can be shared by reference between threads (either as static fields or passed in immutable messages b/w actors). However inside a given actor is mutable state. This the technique I've using extensively in my existing Fan code:

Void run()
{
  context := .... // setup mutable data structures
  loop(&process(context))
  // cleanup mutable data structures
}

The model above doesn't work anymore because it requires a dedicated thread (or continuations). Because Actors are const they can't store their mutable data structures in fields. So we have to define a mechanism to pass their "world state" in and out of the receive method. I am calling this class Context. I am not sure what it looks like yet, but most likely would replace Thread.locals to allow arbitrary name/value pairs, or could be subclassed per Actor.

We still have the problem with queue management, I might let that be a pluggable Queue class and you would have to figure out concurrency yourself (such as using Java FFI to call out to lower level primitives, or provide some primitives in Fan).

ActorService

In a pure actor model, we wouldn't ever care about threads or how actors got their messages processed. But in the real-world, I happen to care a lot. For example I want to give the web server its own thread pool, but I want to limit its max threads so that it never starves out other more important thread pools in the system.

I also need an entity that gives me the hooks to startup and shutdown the actors associated with a given service or piece of functionality. In Java this functionality is captured by the Executor API, although Java is defined in terms of runnable tasks, in Fan we want Actors with message queues.

ActorService is responsible for defining how Actors are executed and managed by thread pools:

const class ActorService : Service
{
   This start()
   This stop()
   This kill()
   This.join(Duration? timeout := null)  // wait until fully stopped
}

Eventually tuning of the threading model would be configured on ActorService, but for now each one will be given its own thread pool.

Note that Actors maintain a reference to their ActorService, but not vise versa. When an actor receives a message onto its queue, it will notify its ActorService that it has pending work and the actor service will be responsible for allocating a thread to it under the covers. There is a guarantee that only one thread is ever servicing a given Actor at one time (because we are passing mutable state via context into the receive method).

There is also the need for an ActorService itself to be an actor to handle its start/stop lifecycle. I am not sure exactly how that works.

JohnDG Wed 25 Feb 2009

It's not clear what an ActorService is or why it's required (aside from being able to locate a "category" of actors). Could you post some sample code that ties all of these classes together (needn't be complete or working)?

brian Wed 25 Feb 2009

It's not clear what an ActorService is or why it's required (aside from being able to locate a "category" of actors).

ActorService is resonsible for how Actors get executed. It serves the same roles as the ExecutorService in the java.concurrent API. It is important because something has to be responsible for starting up the thread pools and shutting them down them cleanly.

For example consider a "datatbase" that does a lot of asynchronously processing. When it time to shut it down, you need to cleanly wait for all the pending queues to be processed and then the threads to shutdown.

A more simple example:

service := ActorService().start // start up thread pool
actor := DummyActor(service)
100.times |Int i| { actor.send(i) }
service.stop.join // wait until actor finishes processing its queue and shutdown

A more typical scenerio is that ActorService are used with various application services like databases, protocol handlers, etc. Each of these services implements or uses ActorService to manage its Actors.

JohnDG Wed 25 Feb 2009

OK, clear now. This looks like a good API design to me, on top of which can be built higher-level models.

Personally, I would disallow sending of null objects because your average actor is not going to expect receiving a null object. Likely some client code will accidentally send an object it does not know to be null to an actor that cannot handle a null message, resulting in an NPE.

Finally, I'd strongly recommend adding an invokeWhenDone method to Future, that invokes the specified function (actor?) when it is "done" (i.e. cancelled or fulfilled). Such methods are very useful when you don't want to block but do want to schedule some action to complete immediately after the future is done. i.e. it allows a form of asynchronous chaining. If you don't add a method like this now, it's not possible to add it via a higher-level API.

andy Wed 25 Feb 2009

+1 for invokeWhenDone - that's very useful.

JohnDG Wed 25 Feb 2009

Or better yet, deliverTo, which delivers the result of the Future to the specified Actor when it becomes available.

brian Thu 19 Mar 2009

Any further comments on this topic? Currently I am planning on starting this change next week.

f00biebletch Fri 20 Mar 2009

Can you post an implementation of DummyActor that does something dumb like add 1 to it's context every time it gets a msg called "add" and subtracts 1 from it's context when it gets a message with value "subtract"?

brian Fri 20 Mar 2009

I don't fully know what context looks like, but let's assume it has a map like interface similar to Thread.locals:

class Dummy : Actor
{
  protected override Void onStart(Context cx)
  {
    cx["count"] = 0
  }

  protected override Obj? receive(Context cx, Obj? msg)
  {
    Int count := cx["count"]
    switch (msg)
    {
      case "add": count++
      case "sub": count--
    }
    cx["count"] = count
    return count
  }
}

f00biebletch Fri 20 Mar 2009

That looks solid to me as well, thanks for posting. I would agree with John on Obj? msg, nullable seems like more trouble than it is worth.

Do you have a philosophy for error handling in an actor? Erlang takes the fail hard, fail fast approach where the actor just dies when it hits an exception, and, like Joe Armstrong says, other actors notice when someone sitting in a room with them just dies, and hence supervisor trees.

How about message prioritization on the incoming queue?

And what about timeouts - say I want an actor that wants to get a message within a certain amount of time, and, if nothing comes, the actor just goes away or takes some other action?

brian Fri 20 Mar 2009

I would agree with John on Obj? msg, nullable seems like more trouble than it is worth.

I actually find myself using null a lot as a special message - especially for quick and dirty things. So my vote is to keep it nullable.

Do you have a philosophy for error handling in an actor? Erlang takes the fail hard, fail fast approach where the actor just dies when it hits an exception

I don't particularly like that approach - in the OO world just because a method fails, it doesn't invalidate the object itself (or least that should be the object's responsibility). The way it works today is that the exception is raised to the caller. So my thought was to have it continue to work that way - an exception raised by the receive() would be be rethrown by the guy who attempts to get the Future.

How about message prioritization on the incoming queue?

My plan is to make queue management pluggable - definitely in terms of coalescing.

And what about timeouts - say I want an actor that wants to get a message within a certain amount of time

I think this is handled by Futures. You either block on the Future with a timeout, or you cancel it (which effectively removes it from the target actor's queue).

brian Thu 26 Mar 2009

I have pushed the initial version of the new Actor framework. It follows my original proposal with two notable changes:

  • I renamed ActorService to ActorGroup
  • I allow you to pass a function to Actor.make like Thread works today

I still have a lot of work to do and tests to write, but the basics should work. More importantly the API sketched out above has been fully documented with the behavior of all the corner cases. My actor scheduling model right now is really simple.

JohnDG Fri 27 Mar 2009

Looks good. Unfortunately, it's missing something like:

Future.deliverTo(Actor actor)

and since there is no way for external code to know the moment when a future becomes available, there is no efficient way to implement the above in a third-party library. So I strongly recommend adding it now. :-)

brian Fri 27 Mar 2009

That will be in there, along with hooks for queue management. I need to get the basics all worked out first.

JohnDG Fri 27 Mar 2009

In that case, looks good to me, and the change from ActorService to ActorGroup is much appreciated -- the old name was confusing.

brian Sat 28 Mar 2009

My original proposal included an onStart and onStop callback for Actor to initialize and cleanup its mutable state. I am not going to include these callbacks because they cause a lot of problems.

The onStart callback is tricky because it creates race conditions b/w the subclass constructors and how the callback gets scheduled on a background thread.

The onStop callback is problematic because it requires ActorGroups to maintain references to Actors - right now groups only reference actors with pending work which allows actors to be scheduled then once done with processing they can be garbage collected.

So I am not going to implement those callbacks. In cases when you need them you can handle them yourself using special start/stop messages via the standard Actor.send method.

brian Sun 29 Mar 2009

Regarding Future.deliverTo - this functionality requires some brainstorming. There are three possible outcomes for a future:

  1. it returns normally with a value after some period of time
  2. it raises an exception, in which case Future.get will too
  3. it is cancelled, in which case Future.get raises CancelledErr

Because of cases 2 and 3, it doesn't make sense to deliver just the result as a message to an actor upon completion. So what should be delivered as the message? The obvious choice is the Future itself. However if it terminated abnormally then there is likely nothing to give the actor context to what the future represents.

So should the future make the original request object available? Should we allow it store a piece of arbitrary user data?

Not sure what the best design is here - ideas are welcome.

JohnDG Sun 29 Mar 2009

I desgined a concurrency framework that introduces a Raincheck. It's similar to a Future, but also keeps track of the request object.

It has lots of methods to determine what has happened to the Raincheck (isCancelled, isDone, isFulfilled), but usually these are not used. Instead, one adds a RaincheckListener, who is notified about key events:

  1. Progress of the raincheck, for rainchecks that take a really long time to fulfill.
  2. Whether or not the raincheck is cancelled by the user.
  3. Whether or not the raincheck aborts with an error. If so, the exact Exception is provided to the listener. It's a RaincheckException, which stores the original request object, allowing one object (such as a RaincheckErrorManager) to handle lots of different exceptions from different rainchecks representing different requests.
  4. The fulfillment of the raincheck.

There's a RaincheckListenerAdapter which one uses to avoid implementing all 4 notification methods of RaincheckListener. In addition, there are convenience methods that internally add a RaincheckListener:

  1. deliverTo, which delivers the fulfillment of the raincheck to the specified function. This method will not be invoked if the raincheck is cancelled or aborts with an error.
  2. handleErrorsWith, which forwards any error onto the specified error handler.
  3. invokeWhenDone, which is invoked in all cases when the raincheck is done (cancelled, aborted by error, or fulfilled).
  4. invokeWhenCancelled, which is invoked if and when the raincheck is cancelled.

In addition, there are many convenience rainchecks, such as composite rainchecks, for bundling together a group of rainchecks (in which case the fulfillment is a map), raincheck chains, etc.

brian Sun 29 Mar 2009

That sounds pretty cool, although I am not sure it helps much in this case because we have to boil everything down to a single message Obj, so we can't really create various callback functions. My current thinking is something like this:

Future Actor.sendWhenDone(Future f, Obj? msg)

That will deliver an arbitrary message when the future completes (however it completes), then you can choose to use the future as the message itself or wrap it:

a.sendWhenDone(f, f)
a.sendWhenDone(f, MyFutureMsg(f))

JohnDG Sun 29 Mar 2009

That sounds pretty cool, although I am not sure it helps much in this case because we have to boil everything down to a single message Obj

There's an alternative. Internally, yes, but that doesn't mean you have to really boil everything down to a single message Obj.

You could, for example, use some private class to represent any message that is not to be sent to an Actor's receive method, but instead processed behind the scenes.

Then you can do things like, Future.deliverTo, Future.handleErrorWith, etc.

Future Actor.sendWhenDone(Future f, Obj? msg)

Putting this on Actor is non-intuitive and hackish to me. If I want to do something when a Future is done, then I'll look on the Future object itself, because it's the class that should know when it's done.

brian Sun 29 Mar 2009

Then you can do things like, Future.deliverTo, Future.handleErrorWith, etc.

It still has be an Obj sent to the Actor - are you suggesting that I construct different messages to send based on success and error? I am not sure what your proposal is?

Putting this on Actor is non-intuitive and hackish to me.

I think have send, sendWhenDone, and sendLater all on Actor makes for a consistent and discoverable API - everything related to sending a messages is encapsulated into one class all prefixed with send.

JohnDG Sun 29 Mar 2009

It still has be an Obj sent to the Actor - are you suggesting that I construct different messages to send based on success and error? I am not sure what your proposal is?

Add methods to Actor that represent different pipelines: e.g. a pipeline for errors and one for cancellations, rather than forcing everything down one pipeline for generic messages. Internally, of course, there would be just one pipeline, but you would filter and redirect to the appropriate methods of Actor.

Then, for example, I can deliver the result of a Future to one actor, but redirect errors to another actor, etc. This kind of stuff is very helpful, because generally you don't want to force each actor to handle error conditions, but you still need a way to access those errors.

Something like:

class Future
{
  // Throws exception
  Obj? get(Duration? timeout := null)

  Bool isDone()

  Bool isCancelled()

  Bool cancel()

  Future deliverTo(Actor actor)

  Future handleErrorsWith(Actor actor)

  Future sendWhenDone(Actor actor, Obj? message)
}
.
.
.
abstract class Actor
{
   ...
   receive
   error
   ...
}

I think have send, sendWhenDone, and sendLater all on Actor

send and sendLater have clear semantics. sendWhenDone is murky. Suppose I take an arbitrary Future and pass it to sendWhenDone. What will happen? What happens if that Future was created by the self-same Actor class? And excluding magic, how would Actor even know when a future is done?

Maybe I'm just not used to the idea, but it smells a little off to me. There seems to be something fundamentally different about send than sendWhenDone. Maybe it's that sendWhenDone implies the thing is sent when the instance on which it is invoked is done, which is not correct in this case.

brian Sun 29 Mar 2009

send and sendLater have clear semantics. sendWhenDone is murky.

Think about it like this - there is only one concept here: sending messages. It is just a question of whether the you send immediately or send after an event has occurred. In the case of sendLater that event is a period of time has elapsed, and for sendWhenDone, the future has completed. It all three cases it is merely a question of scheduling when the message is delivered. Although a convenience on Future might be nice too for method chaining.

Add methods to Actor that represent different pipelines: e.g. a pipeline for errors and one for cancellations, rather than forcing everything down one pipeline for generic messages.

OK I see what you are saying, but not sure I really like that direction for this low level API - conceptually a single queue of messages delivered to single receive function seems a lot simpler to understand and allows for better functional composition. For example one of the nice things the current API allows is for a single function to be passed to the Actor constructor or for you to subclass. The goal here is define a clean simple core API which higher level abstractions can be built upon.

JohnDG Mon 30 Mar 2009

It is just a question of whether the you send immediately or send after an event has occurred.

Yes, but how should Actor know that an arbitrary event has occurred?

There's an incongruence between what I think Actor should know, and what it actually does know. I look at the method and I think, "There's no way Actor can know when an arbitrary Future is done, so I must be misreading that method."

The goal here is define a clean simple core API which higher level abstractions can be built upon.

That's very true. Multiple pipelines, queues, message processing, and message matching can all be added at a higher-level. I like keeping this API nice and simple so the community can experiment with higher-level constructs.

brian Tue 31 Mar 2009

Yes, but how should Actor know that an arbitrary event has occurred?

The whole actor framework is tied together. I do see your point, but to me this basically boils down to what is the most important abstraction: sending a message or the future done event. To me the key abstraction is sending messages. So I am going to go with this API:

send          // now
sendLater     // after timer event
sendWhenDone  // after future done event

I really like having all all send events grouped together.

Here was the other signature I played around with:

class Future
{
  Future sendWhenDone(Actor a, Obj? msg)
  ...
}

It is confusing whether the returned future is this or a new future (it would be a new future for the message sent when done). That is another reason I like the Actor.sendXXX consistency - they all return a Future for the message sent.

qualidafial Tue 31 Mar 2009

An alternative name for sendLater:

sendIn(500ms, msg)

Dubhead Tue 31 Mar 2009

In addition to sendWhenDone, how about this? Just a passing idea.

sendOnSuccess  // send msg to the actor when the future returned normally
sendOnError    // ... when the future raised an exception

sendWhenDone sends msg in both cases.

brian Tue 31 Mar 2009

sendIn(500ms, msg)

Good idea, I typically always go for the shorter name - but in this case I'd prefer to avoid the use of in when not related to InStream

In addition to sendWhenDone, how about this? Just a passing idea

I think sendOnSuccess and sendOnError are something we'll eventually have, but I'd prefer to wait until I (or someone else) has some use cases for them. We can always add, but never take away. Plus I suspect, that someone (or someones) will end up building higher level APIs on top of this low level API which will end up handling all of that stuff.

andy Tue 31 Mar 2009

I like sendIn - though I guess I can see the consistency issue with streams.

qualidafial Tue 31 Mar 2009

sendAfter?

clay Mon 6 Apr 2009

I'm loving the thread safety in Fan. This is really exciting.

I have a question. Below, I'd like to use closure to initialize the Actor inline, however it doesn't compile, b/c although the Str is immutable, the reference, filename, is not.

Any suggestions on how to handle initialization inline?

The example below is artificial, so it is a bit strange, but I wanted to try out closure with Actors.

Thanks, Clay

#! /usr/bin/env fan
*** filename is called test.fan


class ActorClosureTest
{
  Void main()
  {
  PrintThis("test.fan");
  }

  Void PrintThis(Str filename)
  {

    group := ActorGroup();




     a := Actor(group) |Str msg, Context cx->Int?|
     {
        InStream? fin := cx.get("InStream", null);

        if (fin === null)
        {
          echo("creating");
          //f := File(Uri("test.fan"));
          f := File(Uri(filename));
          fin = f.in;
          cx["InStream"] = fin;
        }



      Int? result := fin.read;
      //echo("in " + result);
      return result;
    };

    Future[] futs := List(Future#, 100);

    for (i:=0; i<100; ++i)
    {
      Future fut := a.send("ignored");
      futs.add(fut);
    }

    // do something else...


    futs.each(|Future f| {
      echo(f.get);
    });



  }
}

brian Mon 6 Apr 2009

I have a question. Below, I'd like to use closure to initialize the Actor inline, however it doesn't compile, b/c although the Str is immutable, the reference, filename, is not.

There are three options to pass immutable configuration to your actor:

  1. Pass it as an initialization message
  2. Subclass Actor and declare it as const fields
  3. Use a curried method

The simplest way is to declare a static method and curry the fixed arguments:

a := Actor(group, &receive(filename))

static Int receive(Str filename, Str msg, Context cx)
{
  ...
}

Login or Signup to reply.