26. Actors

Overview

Fantom includes an actor framework for concurrency. Actors are light weight objects which asynchronously process work on a background thread. Actors are given work by sending them asynchronous message. Actor's then process those messages on background threads controlled by an ActorPool.

Actors

The Actor class is used to define new actors. All actors are constructed within an ActorPool which defines how the actor is executed.

Actors may define their receive behavior in one of two ways:

  1. Pass a function to the Actor's constructor
  2. Subclass Actor and override receive

Here are two simple examples of an actor which receives an Int message and returns the increment:

// pass receive to constructor as a closure function
a := Actor(pool) |Int msg->Int| { msg + 1 }

// subclass and override receive
const class IncrActor : Actor
{
  new make(ActorPool p) : super(p) {}
  override Obj? receive(Obj? msg) { msg->increment }
}

An actor is guaranteed to receive its messages atomically - it is never scheduled on multiple threads concurrently. However, an actor is not guaranteed to receive all of its messages on the same thread over time. For example if messages A, B, and C are sent to an actor, the runtime may use three different threads to process those messages. But the actor is guaranteed to process the messages serially one after the other.

Actor Locals

Actors are const classes which means they must be immutable. This lets you pass actor references between actors, but you can't maintain any mutable state in the actor's fields. Instead you can store the actor's "mutable world state" in Actor.locals. Actor locals is a string/object map which works like a thread local - a unique map is used for every actor. To prevent naming collisions, you should prefix your map keys with your pod name:

// store an actor local
Actor.locals["acme.global"] = "hum bug"

// get an actor local
Actor.locals["acme.global"]

For example to build an actor which maintains a counter every time it receives a message:

pool := ActorPool()
a := Actor(pool) |msg|
{
  count := 1 + (Int)Actor.locals.get("count", 0)
  Actor.locals["count"] = count
  return count
}

100.times { a.send("ignored") }
echo("Count is now " + a.send("ignored").get)

Note that in this example, the actor ignores the messages sent to it, so it doesn't really matter what we pass.

Message Passing

Actors communicate by sending each other messages. Messages passed between actors must be immutable to guarantee thread safety.

Messages are sent to an actor using these methods:

  • send: enqueues the message immediately
  • sendLater: enqueues the message after a period of time has elapsed
  • sendWhenComplete: enqueues the message once another message completes processing

Futures

All three send methods return a Future which may used to access the result of that message. You can poll for the result using Future.status - a future enters the complete state by one of three transitions:

  • The actor processes the message and returns a result
  • The actor raises an exception while processing the message
  • The future is cancelled (see cancel)

Once a future enters the complete state, the result is available via the get method. If the future is not complete, then calling get will block the caller until the future becomes done. A timeout may be used to block for a fixed period of time. Calling get results in one of these outcomes:

  • If the message was processed successfully, then get will return the result
  • If the actor raised an exception processing the message, then that same exception is raised to the caller of get
  • If the future was cancelled, then calling get will raise CancelledErr
  • If a timeout is used, then TimeoutErr is thrown if the actor doesn't process the message before the timeout elapses

Actors which block via Future.get should never receive messages themselves as this might lead to deadlocks. Best practice is to design service actors using strictly asynchronous messaging, and keep synchronous messaging on client actors which don't service requests themselves.

Message Error Handling

When an actor raises an exception processing a message in its receive callback, the error is made available in the Future. When the client calls Future.get, then the actor's original exception is raised. However if the client never calls Future.get, then its easy for the error to be ignored by developers. Unfortunately the system cannot know if the error is going to be handled by client code via the Future.

To prevent errors from silently getting ignored, the following design patterns are recommended:

  1. each message should clearly identify who is responsible for processing the error
  2. if the actor is responsible for errors, then it should log the error
  3. if the client is responsible for errors, then it must call Future.get and handle errors

As a general principle, messages processed synchronously get handled by the client. But async messages should probably get logged by the actor since its unlikely the client is doing anything with the Future.

Timers

The sendLater method can be used to setup a timer. Timers post a message back to the actor's queue when they expire. Example:

pool := ActorPool()
a := Actor(pool) |Obj msg| { echo("$Time.now: $msg") }
a.send("start")
a.sendLater(1sec, "1sec")
a.sendLater(3sec, "3sec")
a.sendLater(2sec, "2sec")
Actor.sleep(5sec)

The sendLater method returns a Future which may be used to cancel the timer or poll/block until the message has been processed.

Chaining

The sendWhenComplete method is used to deliver a message once another message has completed processing. Using sendWhenComplete allows asynchronous message chaining. Consider this code:

future := actor1.send(msg1)
actor2.sendWhenComplete(future, msg2)

In this example, msg2 is enqueued on actor2 only after actor1 completes processing of msg1. Typically the future itself is passed as the message:

a.sendWhenComplete(future, future)        // future is message itself
a.sendWhenComplete(future, MyMsg(future)) // MyMsg references future

Remember that sendWhenComplete is called no matter how the future completes: successfully, with an error, or cancellation.

Coalescing Messages

Often when sending messages to an actor, we can merge two messages into a single message to save ourselves some work. For example, it is common in windowing systems to maintain a single union of all the dirty portions of a window rather than of a bunch of little rectangles. An actor can have its messages automatically coalesced using the makeCoalescing constructor.

Let's look at an example:

const class Repaint
{
  new make(Window w, Rect d) { ... }
  Window window
  Rect dirty
}

toKey := |Repaint msg->Obj| { msg.window }
coalesce := |Repaint a, Repaint b->Obj| { Repaint(a.window, a.dirty.union(b.dirty)) }
a := Actor.makeCoalescing(g, toKey, coalesce) |Repaint msg| {...}

In this example the messages are instances of Repaint. The toKey function is used to obtain the key which determines if two messages can be coalesced. In this example we coalesce repaints per window. If the thread detects two pending messages with the same key (the window in this case), then it calls the coalesce function to merge the messages. In example we return a new Repaint event with the union of the two dirty regions.

Messages sent with sendLater and sendWhenDone are never coalsesed.

Flow Control

The current implementation of Fantom uses unbounded message queues. This means if an actor is receiving messages faster than it can process them, then its queue will continue to grow. Eventually this might result in out of memory exceptions. You can use some of the following techniques to implement flow control to prevent unbounded queues from growing forever:

  • Poll futures with isDone or use get with timeouts to cancel messages which aren't processed after a period of time
  • Use coalescing queues to merge pending messages
  • Use sendLater to schedule watch dog timers on an actor's queue
  • Use sendWhenDone to create message feedback loops

For example consider a "reader" actor which reads lines of text from a big text file and sends those lines to other "processing" actors for parallel processing. If the reader pushes the lines of text as fast as it can read them, then it could potentially end up enqueuing large numbers of lines in memory. A better strategy would be to have the processing actors enqueue themselves with the reader when they are ready to process a line. This would create a natural feedback loop and allow the reader to throttle its IO based on how fast the processors could work.

Actor Pools

All actor's are created within an ActorPool. ActorPools manage the execution of actors using a shared thread pool.

As messages are sent to actors, they are allocated a thread to perform their work. An ActorPool will create up to 100 threads, after which actor's must wait for a thread to free up. Once a thread frees up, then it is used to process the next actor. If no actor's have pending work, then the thread lingers for a few seconds before being released back to the operating system. In this model an ActorPool utilizes between zero and a peak of 100 threads depending on how many of the pool's actors currently have work. You can tweak the peak limit by setting Actor.maxThreads:

ActorPool { maxThreads = 10 }

An ActorPool is immediately considered running as soon as it is constructed. However, it doesn't actually spawn its first thread until one of its actors is sent a message. If all of a pool's actors finish processing their messages, then after a linger period all of that pool's threads be freed.

An ActorPool can be manually shutdown using the stop method. Once stop is called, the pool enters the stopped state and actors within the pool may not receive any more messages. However all pending messages are allowed to continue processing. Once all pending messages have been processed, the pool enters the done state. Use the join method to block until an ActorPool has fully shutdown.

The kill method can be used to perform an unorderly shutdown. Unlike stop, kill doesn't give actors a chance to finish processing their pending message queues - all pending messages are cancelled. Actors which are currently executing a message are interrupted (which may or may not immediately terminate that thread). Once all actors have relinquished their threads, the ActorPool enters the done state.