#840 Actors behaviour

jsendra Sun 29 Nov 2009

I'm making some experiments with concurrent programming in fantom. Reading documentation, Fantom is an Actor language, but certainly it's not a traditional actor language.

Wikipedia:

"In computer science, the Actor model is a mathematical model of concurrent computation that treats "actors" as the universal primitives of concurrent digital computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received."

The last part, determine how to respond to the next message received, requires an actor's state change (primitive becomes or similar in other languages).

I've tried an actor-based implementation of the sieve algorithm to calculate prime numbers (sieve= filters chain, where each filter removes multiples of a prime number) . Using Erlang, filter code is:

filter(N,Next) ->
	receive 
	    {num,X} ->
		if X rem N == 0 -> filter(N,Next);
		true->
		    if is_pid(Next) -> N!{num,X}, filter(N,Next);
		    true->io:format(",~p",[X]),
			    filter(N,spawn(primes,filter,[X,nil]))
		    end
		end;
	    end -> Next!end
	end.

The last created filter has Next field to nil (null). But when we append another filter, the previous last filter becomes another filter with the new value of Next.

Actors are inmutable in Fantom (const class), so in Fantom's code I've included a context (map) in each receive to maintain changed state.

const class Filter: Actor 
{
	const Int d // divisor
	new make(ActorPool pool, Int d) :super(pool) 
	{
		this.d=d
	}
	override Obj? receive(Obj? obj, Context cx) 
	{
		Int n:=(Int)obj
		Filter? next:= cx.get("next",null)
		if (n==0) {Sys.out.print(d+" "); next?.send(0); return d}
		if (n%d >0) 
		{
			if (next!=null) 
				next.send(n) 
			else 
				cx["next"]=Filter(pool, n)
		}
		return null
	}
}

class Primes 
{
	Void main(Str[] args) 
	{
		n:= Int(args[0])
		pool:=ActorPool()

		f:= Filter(pool,2)
		for (i:=2; i<n; i++) 
			f.send(i)
		f.send(0)
		pool.join // wait for actor finalization
		Sys.out.printLine // flush buffer
	}
}

The code works well, but I'm sure there's a better way to code filter's chain

NOTE

  • I've included pool.join to wait for actors finalization, but this sentence introduces a noticiable delay
  • Screen printing is the most expensive operation. I wonder if it's possible to append each prime to a global array (at actor construcion time, dispending also the 0 argument to indicate end), and write at the end with a simple echo.

My (double) question is:

  • is there a becomes equivalent in Fantom?
  • is it possible to actualize (ex. append) a global variable from an actor?

thanks

jsendra

brian Sun 29 Nov 2009

Hi jsendra,

Glad to see you trying out Fantom's actors!

To answer your specific questions:

is there a becomes equivalent in Fantom?

I assume you mean like a SmallTalk become where we can swizzle a reference potentially used by many objects to automatically point to some new type/instance? If that is what you mean, then no - Fantom tries to efficiently stick to the reference model of the JVM and CLR and type things as mutable or immutable.

is it possible to actualize (ex. append) a global variable from an actor?

There are no global variables in Fantom unless they are immutable. But with a couple lines you can write an Actor which takes logging messages, then prints them to stdout on demand:

log := Actor(ActorPool()) |Str msg, Context cx|
{
  StrBuf buf := cx.map.getOrAdd("buf") { StrBuf() }
  if (msg == "echo") echo(buf.toStr)
  else buf.add(msg)
}

10.times |i| { log.send("Line $i\n") }
echo("Messages sent...")
Actor.sleep(2sec)
log.send("echo")

Because of Fanton's immutable type system, we require Actors to be const so they can be passed around safely b/w threads. But that forces mutable state to managed in Context. It is a bit ugly, so I'm definitely open to suggestions to how we can make things more elegant.

KevinKelley Sun 29 Nov 2009

a bit ugly

I'm not even sure that it is ugly, just different. It's a pretty straightforward way to deal with "global" state, which these days doesn't really make sense anyway.

I mean, it's pretty reasonable now, with Fantom for example, to have a project in a single codebase that interfaces to a console, a local gui, and also via Fantom->Javascript translation to a browser anywhere. So, the concept of "global" doesn't exactly mean what it used to, and needed to be replaced.

I guess these days if you really want a global variable, you need to give it a URI and put up a bit of code to serve it out.

(This is off the topic of Actor model, though; sorry)

KevinKelley Sun 29 Nov 2009

I've included pool.join to wait for actors finalization, but this sentence introduces a noticiable delay

There sure is; removing the join and just sleeping for a few milliseconds, it runs entirely in 24ms (for Primes 99); putting the join back in runs in 5032ms (timing from start to end of main). I'm guessing there's something wrong somewhere; that don't seem right.

brian Sun 29 Nov 2009

There sure is; removing the join and just sleeping for a few milliseconds, it runs entirely in 24ms (for Primes 99); putting the join back in runs in 5032ms (timing from start to end of main).

5sec is always a suspicious duration. Although I didn't see where in the code the actor pool is stopped, in which case you are just waiting for all the threads to die off. So that line should probably read pool.stop.join.

KevinKelley Mon 30 Nov 2009

Oh. Yeah, that does it.

Not sure I'm understanding it, though. Actor.join fandoc says "a null timeout blocks forever"; how come's it not blocking forever then?

brian Mon 30 Nov 2009

Not sure I'm understanding it, though. Actor.join fandoc says "a null timeout blocks forever"; how come's it not blocking forever then?

There are all sorts of intricacies related to "shutdown" regarding message queues and threads, but the way joins works today is waits until all the pool's threads are exited. In this case the pool is never officially stopped, but since no messages are sent then after a 5sec linger time all the threads die naturally anyways.

KevinKelley Mon 30 Nov 2009

I guess I got confused a bit by where the fandoc for ActorPool.stop says "use join to wait for all actors to complete their message queue". Like join is an alternative to stop.

Playing a bit more with that example code; stop.join doesn't really fix it; the stop may happen before some of the Filter actors have finished sending messages down the stream. What's needed is probably to use a sendWhenDone on the future returned by the last send in the chain, to pass along a "finished" signal.

KevinKelley Mon 30 Nov 2009

Okay, here's a variation on jsendras code, with Brian's logger actor, and using future.get as a means of synchronizing on the completion of the actors. It seems to be working with no unexpected delays, and I think it's correct. There's probably a way to do it more elegantly, though.

const class Logger: Actor
{
  new make() : super(ActorPool.make /*separate threadpool*/) {}

  override Obj? receive(Obj? msg, Context cx)
  {
    StrBuf buf := cx.map.getOrAdd("buf") { StrBuf() }
    if (msg == "echo") echo("> " + buf.toStr)
    else buf.add(msg)
    return true // message received and acted on
  }
}

const class Filter: Actor
{
  const Int d // divisor
  const Actor logger

  new make(ActorPool pool, Actor logger, Int d) : super(pool)
  {
    this.d=d
    this.logger=logger
  }
  override Obj? receive(Obj? msg, Context cx)
  {
    if (msg == "quit")
    {
      Filter? next := cx.get("next",null)
      if (next != null)        // forward msg down the chain,
        return next.send(msg)  // returning the future for completion-testing
      return true // handled
    }
    if (msg is Int)
    {
      n:= msg as Int
      if (n % d > 0)
      {
        Filter? next := cx["next"]
        if (next != null)
          next.send(n)  // pass n to next filter
        else
        {
          // if all filters fail, n is a new prime
          logger.send(n + " ")
          cx["next"] = Filter(pool, logger, n)
        }
      }
      return true // handled
    }
    return null // unhandled
  }
}

class Primes
{
  Void main(Str[] args)
  {
    starttime := Duration.now

    log := Logger.make

    n:= Int(args[0])
    pool:=ActorPool()

    filter := Filter(pool,log,2)        // create a divide-by-2 filter
    (2..<n).each |i| { filter.send(i) } // send each int to the filter-chain

    future := filter.send("quit").get
    while (future != true)
    { // trail down the futures chain, waiting for "get"s to return
      if (future is Future) future = ((Future)future).get
    }

    done := log.send("echo").get // "get" forces wait to let log act

    millis := (Duration.now - starttime).toMillis
    echo("time used: $millis ms")
  }
}

ivan Mon 30 Nov 2009

There is definitely some 5sec delay in pool.join:

public static Void main(Str[] args)
{
  emptyDelay := measure |,| {}
  echo("$emptyDelay")

  pool := ActorPool()

  emptyJoin := measure |,| { pool.join}
  echo("$emptyJoin")

  a := Actor(pool) |Obj? msg, Context cx->Obj?| { null }
  a.send(null)

  join := measure |,| { pool.join }
  echo("$join")
}

private static Duration measure(|,| func)
{
  start := Duration.now
  func.call
  return Duration.now - start
}

The output is:

4000ns
14000ns
4999817000ns

brian Mon 30 Nov 2009

After the confusion of ActorPool.join, I think the best solution is to have it raise an exception if the pool isn't stopped.

I pushed a fix changeset

ivan Mon 30 Nov 2009

In my opinion this solution may be too radical for a small confusion. Sometimes it may be really simpler to wait for all actors to finish work by calling pool.join, and 5 sec delay is not a big deal comparing to lots of Future objects and checking isDone value. And calling stop too early in some "main" thread will cause errors sending messages between actors. Or am I missing something?

brian Mon 30 Nov 2009

In my opinion this solution may be too radical for a small confusion. Sometimes it may be really simpler to wait for all actors to finish work by calling pool.join, and 5 sec delay is not a big deal comparing to lots of Future objects and checking isDone value.

My intent was that join was always used to wait until a pool was fully stopped. I suppose there might be value in just waiting for all the threads to die, but that is probably never the best way to do it. So I think to be safe it better to be restrictive (we can always make it less restrictive in the future without breaking backward compatibility).

KevinKelley Mon 30 Nov 2009

After the confusion

I think the confusion is still going on. :-) Sorry for not getting my thoughts clearer before posting.

In the above example, the actor pool is a group of actors that cooperate by sending messages to each other, to search for prime numbers. So, calling stop from outside, before they've had a chance to finish interacting, breaks it, since stop causes messages to no longer be posted.

The original poster's code, using join, solved the example problem fine, it just had that odd five-second delay.

The code I posted, which uses a sentinel message that gets posted to all the actors in the group, and waits (using future.get) on all of them to process it, avoids the join and lets all the actors in the chain properly complete; but that's kind of a pain when you just want to let everybody run to completion.

The following changeset, seems to fix the join timeout, hopefully without causing other problems. See what you think of it.

=== (+11,-2) src/sys/java/fanx/util/ThreadPool.java ===
@@ -109,7 +109,12 @@
     while (true)
     {
       // if all workers have completed, then return success
-      if (workers.size() == 0) return true;
+      if (pending.size() == 0                 // no unassigned work
+        && workers.size() == idle.size())     // all workers are idle
+        return true;

       // if we have gone past our deadline, return false
       long toSleep = deadline - System.nanoTime()/1000000L;
@@ -180,6 +185,10 @@

     // add to head of idle list (we let oldest threads die out first)
     idle.addFirst(w);
+
+    // some work has completed; call notify in case we're idling
+    notify();
+
     return true;
   }

brian Mon 30 Nov 2009

The original poster's code, using join, solved the example problem fine, it just had that odd five-second delay.

I think that was just lucky - it just happened to work because the 5sec delay was enough time for everything to finish up.

In general if you have complex relationships between actors, then you will need to design shutdown sequencing into your messaging model. What you might want to do is use ActorPool.stop for the "public API actors" so that clients can't send anymore messages. Then use a combination of stop and normal messages to ensure that all your queues are drained correctly.

The SkyFoundry database makes extensive use of actors and different pools for disk IO, network IO, background work, and query indexing. And a clean shutdown is without a doubt the most complicated aspect of the actor model, especially if you have circular actor dependencies.

KevinKelley Tue 1 Dec 2009

I think that was just lucky - it just happened to work because the 5sec delay was enough time for everything to finish up.

Well, no, that's not what's happening. Primes 100000 takes 84 seconds, finds all the primes < 100000, uses a whole boatload of actors, joins just fine. It's just that, in the java code, ThreadPool.join has a wait that doesn't wake up and complete, until at least that 5sec timeout has passed.

Anyway, not a big deal, not my problem in the first place, I just thought it was a neat example code and wondered why it showed that one oddness.

Login or Signup to reply.