5. Actors

The actor support in GPars was originally inspired by the Actors library in Scala, but has since gone well beyond what Scala offers as standard.

Actors allow for a message passing-based concurrency model: programs are collections of independent active objects that exchange messages and have no mutable shared state. Actors can help developers avoid issues such as deadlock, live-lock and starvation, which are common problems for shared memory based approaches. Actors are a way of leveraging the multi-core nature of today's hardware without all the problems traditionally associated with shared-memory multi-threading, which is why programming languages such as Erlang and Scala have taken up this model.

A nice article summarizing the key concepts behind actors was written recently by Ruben Vermeersch. Actors always guarantee that at most one thread processes the actor's body at any one time and also, under the covers, that the memory gets synchronized each time a thread gets assigned to an actor so the actor's state can be safely modified by code in the body without any other extra (synchronization or locking) effort . Ideally actor's code should never be invoked directly from outside so all the code of the actor class can only be executed by the thread handling the last received message and so all the actor's code is implicitly thread-safe . If any of the actor's methods is allowed to be called by other objects directly, the thread-safety guarantee for the actor's code and state are no longer valid .

Types of actors

In general, you can find two types of actors in the wild - ones that hold implicit state and those, who don't. GPars gives you both options. Stateless actors, represented in GPars by the DynamicDispatchActor and the ReactiveActor classes, keep no track of what messages have arrived previously. You may thing of these as flat message handlers, which process messages as they come. Any state-based behavior has to be implemented by the user.

The stateful actors, represented in GPars by the DefaultActor class (and previously also by the AbstractPooledActor class), allow the user to handle implicit state directly. After receiving a message the actor moves into a new state with different ways to handle future messages. To give you an example, a freshly started actor may only accept some types of messages, e.g. encrypted messages for decryption, only after it has received the encryption keys. The stateful actors allow to encode such dependencies directly in the structure of the message-handling code. Implicit state management, however, comes at a slight performance cost, mainly due to the lack of continuations support on JVM.

Actor threading model

Since actors are detached from the system threads, a great number of actors can share a relatively small thread pool. This can go as far as having many concurrent actors that share a single pooled thread. This architecture allows to avoid some of the threading limitations of the JVM. In general, while the JVM can only give you a limited number of threads (typically around a couple of thousands), the number of actors is only limited by the available memory. If an actor has no work to do, it doesn't consume threads.

Actor code is processed in chunks separated by quiet periods of waiting for new events (messages). This can be naturally modeled through continuations . As JVM doesn't support continuations directly, they have to be simulated in the actors frameworks, which has slight impact on organization of the actors' code. However, the benefits in most cases outweigh the difficulties.

import groovyx.gpars.actor.Actor
import groovyx.gpars.actor.DefaultActor

class GameMaster extends DefaultActor { int secretNum

void afterStart() { secretNum = new Random().nextInt(10) }

void act() { loop { react { int num -> if (num > secretNum) reply 'too large' else if (num < secretNum) reply 'too small' else { reply 'you win' terminate() } } } } }

class Player extends DefaultActor { String name Actor server int myNum

void act() { loop { myNum = new Random().nextInt(10) server.send myNum react { switch (it) { case 'too large': println "$name: $myNum was too large"; break case 'too small': println "$name: $myNum was too small"; break case 'you win': println "$name: I won $myNum"; terminate(); break } } } } }

def master = new GameMaster().start() def player = new Player(name: 'Player', server: master).start()

//this forces main thread to live until both actors stop [master, player]*.join()

example by Jordi Campos i Miralles, Departament de MatemĂ tica Aplicada i AnĂ lisi, MAiA Facultat de MatemĂ tiques, Universitat de Barcelona

Usage of Actors

Gpars provides consistent Actor APIs and DSLs. Actors in principal perform three specific operations - send messages, receive messages and create new actors. Although not specifically enforced by GPars messages should be immutable or at least follow the hands-off policy when the sender never touches the messages after the message has been sent off.

Sending messages

Messages can be sent to actors using the send() method.

def passiveActor = Actors.actor{
    loop {
        react { msg -> println "Received: $msg"; }
    }
}
passiveActor.send 'Message 1'
passiveActor << 'Message 2'    //using the << operator
passiveActor 'Message 3'       //using the implicit call() method

Alternatively, the << operator or the implicit call() method can be used. A family of sendAndWait() methods is available to block the caller until a reply from the actor is available. The reply is returned from the sendAndWait() method as a return value. The sendAndWait() methods may also return after a timeout expires or in case of termination of the called actor.

def replyingActor = Actors.actor{
    loop {
        react { msg ->
            println "Received: $msg";
            reply "I've got $msg"
        }
    }
}
def reply1 = replyingActor.sendAndWait('Message 4')
def reply2 = replyingActor.sendAndWait('Message 5', 10, TimeUnit.SECONDS)
use (TimeCategory) {
    def reply3 = replyingActor.sendAndWait('Message 6', 10.seconds)
}

The sendAndContinue() method allows the caller to continue its processing while the supplied closure is waiting for a reply from the actor.

friend.sendAndContinue 'I need money!', {money -> pocket money}
println 'I can continue while my friend is collecting money for me'

All send() , sendAndWait() or sendAndContinue() methods will throw an exception if invoked on a non-active actor.

Receiving messages

Non-blocking message retrieval

Calling the react() method, optionally with a timeout parameter, from within the actor's code will consume the next message from the actor's inbox, potentially waiting, if there is no message to be processed immediately.

println 'Waiting for a gift'
react {gift ->
    if (myWife.likes gift) reply 'Thank you!'
}

Under the covers the supplied closure is not invoked directly, but scheduled for processing by any thread in the thread pool once a message is available. After scheduling the current thread will then be detached from the actor and freed to process any other actor, which has received a message already.

To allow detaching actors from the threads the react() method demands the code to be written in a special Continuation-style.

Actors.actor {
    loop {
        println 'Waiting for a gift'
        react {gift ->
            if (myWife.likes gift) reply 'Thank you!'
            else {
                reply 'Try again, please'
                react {anotherGift ->
                    if (myChildren.like gift) reply 'Thank you!'
                }
                println 'Never reached'
            }
        }
        println 'Never reached'
    }
    println 'Never reached'
}

The react() method has a special semantics to allow actors to be detached from threads when no messages are available in their mailbox. Essentially, react() schedules the supplied code (closure) to be executed upon next message arrival and returns. The closure supplied to the react() methods is the code where the computation should continue . Thus continuation style .

Since actor has to preserve the guarantee of at most one thread active within the actor's body, the next message cannot be handled before the current message processing finishes. Typically, there shouldn't be a need to put code after calls to react() . Some actor implementations even enforce this, however, GPars does not for performance reasons. The loop() method allows iteration within the actor body. Unlike typical looping constructs, like for or while loops, loop() cooperates with nested react() blocks and will ensure looping across subsequent message retrievals.

Sending replies

The reply/replyIfExists methods are not only defined on the actors themselves, but for AbstractPooledActor (not available in DefaultActor , DynamicDispatchActor nor ReactiveActor classes) also on the processed messages themselves upon their reception, which is particularly handy when handling multiple messages in a single call. In such cases reply() invoked on the actor sends a reply to authors of all the currently processed message (the last one), whereas reply() called on messages sends a reply to the author of the particular message only.

See demo here

The sender property

Messages upon retrieval offer the sender property to identify the originator of the message. The property is available inside the Actor's closure:

react {tweet ->
    if (isSpam(tweet)) ignoreTweetsFrom sender
    sender.send 'Never write me again!'
}

Forwarding

When sending a message, a different actor can be specified as the sender so that potential replies to the message will be forwarded to the specified actor and not to the actual originator.

def decryptor = Actors.actor {
    react {message ->
        reply message.reverse()
//        sender.send message.reverse()    //An alternative way to send replies
    }
}

def console = Actors.actor { //This actor will print out decrypted messages, since the replies are forwarded to it react { println 'Decrypted message: ' + it } }

decryptor.send 'lellarap si yvoorG', console //Specify an actor to send replies to console.join()

Creating Actors

Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned to back the pool once a message has been processed and the actor is idle waiting for some more messages to arrive.

For example, this is how you create an actor that prints out all messages that it receives.

def console = Actors.actor {
    loop {
        react {
            println it
        }
    }
}

Notice the loop() method call, which ensures that the actor doesn't stop after having processed the first message.

Here's an example with a decryptor service, which can decrypt submitted messages and send the decrypted messages back to the originators.

final def decryptor = Actors.actor {
    loop {
        react {String message ->
            if ('stopService' == message) {
                println 'Stopping decryptor'
                stop()
            }
            else reply message.reverse()
        }
    }
}

Actors.actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it decryptor.send 'stopService' } }.join()

Here's an example of an actor that waits for up to 30 seconds to receive a reply to its message.

def friend = Actors.actor {
    react {
        //this doesn't reply -> caller won't receive any answer in time
        println it
        //reply 'Hello' //uncomment this to answer conversation
        react {
            println it
        }
    }
}

def me = Actors.actor { friend.send('Hi') //wait for answer 1sec react(1000) {msg -> if (msg == Actor.TIMEOUT) { friend.send('I see, busy as usual. Never mind.') stop() } else { //continue conversation println "Thank you for $msg" } } }

me.join()

Undelivered messages

Sometimes messages cannot be delivered to the target actor. When special action needs to be taken for undelivered messages, at actor termination all unprocessed messages from its queue have their onDeliveryError() method called. The onDeliveryError() method or closure defined on the message can, for example, send a notification back to the original sender of the message.

final DefaultActor me
me = Actors.actor {
    def message = 1

message.metaClass.onDeliveryError = {-> //send message back to the caller me << "Could not deliver $delegate" }

def actor = Actors.actor { react { //wait 2sec in order next call in demo can be emitted Thread.sleep(2000) //stop actor after first message stop() } }

actor << message actor << message

react { //print whatever comes back println it }

}

me.join()

Alternatively the onDeliveryError() method can be specified on the sender itself. The method can be added both dynamically

final DefaultActor me
me = Actors.actor {
    def message1 = 1
    def message2 = 2

def actor = Actors.actor { react { //wait 2sec in order next call in demo can be emitted Thread.sleep(2000) //stop actor after first message stop() } }

me.metaClass.onDeliveryError = {msg -> //callback on actor inaccessibility println "Could not deliver message $msg" }

actor << message1 actor << message2

actor.join()

}

me.join()

and statically in actor definition:

class MyActor extends DefaultActor {
    public void onDeliveryError(msg) {
        println "Could not deliver message $msg"
    }
    …
}

Joining actors

Actors provide a join() method to allow callers to wait for the actor to terminate. A variant accepting a timeout is also available. The Groovy spread-dot operator comes in handy when joining multiple actors at a time.

def master = new GameMaster().start()
def player = new Player(name: 'Player', server: master).start()

[master, player]*.join()

Conditional and counting loops

The loop() method allows for either a condition or a number of iterations to be specified, optionally accompanied with a closure to invoke once the loop finishes - After Loop Termination Code Handler .

The following actor will loop three times to receive 3 messages and then prints out the maximum of the received messages.

final Actor actor = Actors.actor {
    def candidates = []
    def printResult = {-> println "The best offer is ${candidates.max()}"}

loop(3, printResult) { react { candidates << it } } }

actor 10 actor 30 actor 20 actor.join()

The following actor will receive messages until a value greater then 30 arrives.

final Actor actor = Actors.actor {
    def candidates = []
    final Closure printResult = {-> println "Reached best offer - ${candidates.max()}"}

loop({-> candidates.max() < 30}, printResult) { react { candidates << it } } }

actor 10 actor 20 actor 25 actor 31 actor 20 actor.join()

The After Loop Termination Code Handler can use actor's react{} but not loop() .

DefaultActor can be set to behave in a fair on non-fair (default) manner. Depending on the strategy chosen, the actor either makes the thread available to other actors sharing the same parallel group (fair), or keeps the thread fot itself until the message queue gets empty (non-fair). Generally, non-fair actors perform 2 - 3 times better than fair ones.

Use either the fairActor() factory method or the actor's makeFair() method.

Custom schedulers

Actors leverage the standard JDK concurrency library by default. To provide a custom thread scheduler use the appropriate constructor parameter when creating a parallel group (PGroup class). The supplied scheduler will orchestrate threads in the group's thread pool.

Please also see the numerous Actor Demos .

5.1 Actors Principles

Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned back to the pool once a message has been processed and the actor is idle waiting for some more messages to arrive. Actors become detached from the underlying threads and so a relatively small thread pool can serve potentially unlimited number of actors. Virtually unlimited scalability in number of actors is the main advantage of event-based actors , which are detached from the underlying physical threads.

Here are some examples of how to use actors. This is how you create an actor that prints out all messages that it receives.

import static groovyx.gpars.actor.Actors.*

def console = actor { loop { react { println it } }

Notice the loop() method call, which ensures that the actor doesn't stop after having processed the first message.

As an alternative you can extend the DefaultActor class and override the act() method. Once you instantiate the actor, you need to start it so that it attaches itself to the thread pool and can start accepting messages. The actor() factory method will take care of starting the actor.

class CustomActor extends DefaultActor {
    @Override
    protected void act() {
        loop {
            react {
                println it
            }
        }
    }
}

def console=new CustomActor() console.start()

Messages can be sent to the actor using multiple methods

console.send('Message')
console 'Message'
console.sendAndWait 'Message'                                                     //Wait for a reply
console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"}  //Forward the reply to a function

Creating an asynchronous service

import static groovyx.gpars.actor.Actors.*

final def decryptor = actor { loop { react {String message-> reply message.reverse() } } }

def console = actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it } }

console.join()

As you can see, you create new actors with the actor() method passing in the actor's body as a closure parameter. Inside the actor's body you can use loop() to iterate, react() to receive messages and reply() to send a message to the actor, which has sent the currently processed message. The sender of the current message is also available through the actor's sender property. When the decryptor actor doesn't find a message in its message queue at the time when react() is called, the react() method gives up the thread and returns it back to the thread pool for other actors to pick it up. Only after a new message arrives to the actor's message queue, the closure of the react() method gets scheduled for processing with the pool. Event-based actors internally simulate continuations - actor's work is split into sequentially run chunks, which get invoked once a message is available in the inbox. Each chunk for a single actor can be performed by a different thread from the thread pool.

Groovy flexible syntax with closures allows our library to offer multiple ways to define actors. For instance, here's an example of an actor that waits for up to 30 seconds to receive a reply to its message. Actors allow time DSL defined by org.codehaus.groovy.runtime.TimeCategory class to be used for timeout specification to the react() method, provided the user wraps the call within a TimeCategory use block.

def friend = Actors.actor {
    react {
        //this doesn't reply -> caller won't receive any answer in time
        println it
        //reply 'Hello' //uncomment this to answer conversation
        react {
            println it
        }
    }
}

def me = Actors.actor { friend.send('Hi') //wait for answer 1sec react(1000) {msg -> if (msg == Actor.TIMEOUT) { friend.send('I see, busy as usual. Never mind.') stop() } else { //continue conversation println "Thank you for $msg" } } }

me.join()

When a timeout expires when waiting for a message, the Actor.TIMEOUT message arrives instead. Also the onTimeout() handler is invoked, if present on the actor:

def friend = Actors.actor {
    react {
        //this doesn't reply -> caller won't receive any answer in time
        println it
        //reply 'Hello' //uncomment this to answer conversation
        react {
            println it
        }
    }
}

def me = Actors.actor { friend.send('Hi')

delegate.metaClass.onTimeout = {-> friend.send('I see, busy as usual. Never mind.') stop() }

//wait for answer 1sec react(1000) {msg -> if (msg != Actor.TIMEOUT) { //continue conversation println "Thank you for $msg" } } }

me.join()

Notice the possibility to use Groovy meta-programming to define actor's lifecycle notification methods (e.g. onTimeout() ) dynamically. Obviously, the lifecycle methods can be defined the usual way when you decide to define a new class for your actor.

class MyActor extends DefaultActor {
    public void onTimeout() {
        …
    }

protected void act() { … } }

Actors guarantee thread-safety for non-thread-safe code

Actors guarantee that always at most one thread processes the actor's body at a time and also under the covers the memory gets synchronized each time a thread gets assigned to an actor so the actor's state can be safely modified by code in the body without any other extra (synchronization or locking) effort .

class MyCounterActor extends DefaultActor {
    private Integer counter = 0

protected void act() { loop { react { counter++ } } } }

Ideally actor's code should never be invoked directly from outside so all the code of the actor class can only be executed by the thread handling the last received message and so all the actor's code is implicitly thread-safe . If any of the actor's methods is allowed to be called by other objects directly, the thread-safety guarantee for the actor's code and state are no longer valid .

Simple calculator

A little bit more realistic example of an event-driven actor that receives two numeric messages, sums them up and sends the result to the console actor.

import groovyx.gpars.group.DefaultPGroup

//not necessary, just showing that a single-threaded pool can still handle multiple actors def group = new DefaultPGroup(1);

final def console = group.actor { loop { react { println 'Result: ' + it } } }

final def calculator = group.actor { react {a -> react {b -> console.send(a + b) } } }

calculator.send 2 calculator.send 3

calculator.join() group.shutdown()

Notice that event-driven actors require special care regarding the react() method. Since event_driven actors need to split the code into independent chunks assignable to different threads sequentially and continuations are not natively supported on JVM, the chunks are created artificially. The react() method creates the next message handler. As soon as the current message handler finishes, the next message handler (continuation) gets scheduled.

Concurrent Merge Sort Example

For comparison I'm also including a more involved example performing a concurrent merge sort of a list of integers using actors. You can see that thanks to flexibility of Groovy we came pretty close to the Scala model, although I still miss Scala pattern matching for message handling.

import groovyx.gpars.group.DefaultPGroup
import static groovyx.gpars.actor.Actors.actor

Closure createMessageHandler(def parentActor) { return { react {List<Integer> message -> assert message != null switch (message.size()) { case 0..1: parentActor.send(message) break case 2: if (message[0] <= message[1]) parentActor.send(message) else parentActor.send(message[-1..0]) break default: def splitList = split(message)

def child1 = actor(createMessageHandler(delegate)) def child2 = actor(createMessageHandler(delegate)) child1.send(splitList[0]) child2.send(splitList[1])

react {message1 -> react {message2 -> parentActor.send merge(message1, message2) } } } } } }

def console = new DefaultPGroup(1).actor { react { println "Sorted array:t${it}" System.exit 0 } }

def sorter = actor(createMessageHandler(console)) sorter.send([1, 5, 2, 4, 3, 8, 6, 7, 3, 9, 5, 3]) console.join()

def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

Since actors reuse threads from a pool, the script will work with virtually any size of a thread pool, no matter how many actors are created along the way.

Actor lifecycle methods

Each Actor can define lifecycle observing methods, which will be called whenever a certain lifecycle event occurs.

You can either define the methods statically in your Actor class or add them dynamically to the actor's metaclass:

class MyActor extends DefaultActor {
    public void afterStart() {
        …
    }
    public void onTimeout() {
        …
    }

protected void act() { … } }

def myActor = actor {
    delegate.metaClass.onException = {
        log.error('Exception occurred', it)
    }

… }

To help performance, you may consider using the silentStart() method instead of start() when starting a DynamicDispatchActor or a ReactiveActor . Calling silentStart() will by-pass some of the start-up machinery and as a result will also avoid calling the afterStart() method. Due to its stateful nature, DefaultActor cannot be started silently.

Pool management

Actors can be organized into groups and as a default there's always an application-wide pooled actor group available. And just like the Actors abstract factory can be used to create actors in the default group, custom groups can be used as abstract factories to create new actors instances belonging to these groups.

def myGroup = new DefaultPGroup()

def actor1 = myGroup.actor { … }

def actor2 = myGroup.actor { … }

The actors belonging to the same group share the underlying thread pool of that group. The pool by default contains n + 1 threads, where n stands for the number of CPUs detected by the JVM. The pool size can be set explicitly either by setting the gpars.poolsize system property or individually for each actor group by specifying the appropriate constructor parameter.

def myGroup = new DefaultPGroup(10)  //the pool will contain 10 threads

The thread pool can be manipulated through the appropriate DefaultPGroup class, which delegates to the Pool interface of the thread pool. For example, the resize() method allows you to change the pool size any time and the resetDefaultSize() sets it back to the default value. The shutdown() method can be called when you need to safely finish all tasks, destroy the pool and stop all the threads in order to exit JVM in an organized manner.

… (n+1 threads in the default pool after startup)

Actors.defaultActorPGroup.resize 1 //use one-thread pool

… (1 thread in the pool)

Actors.defaultActorPGroup.resetDefaultSize()

… (n+1 threads in the pool)

Actors.defaultActorPGroup.shutdown()

As an alternative to the DefaultPGroup , which creates a pool of daemon threads, the NonDaemonPGroup class can be used when non-daemon threads are required.

def daemonGroup = new DefaultPGroup()

def actor1 = daemonGroup.actor { … }

def nonDaemonGroup = new NonDaemonPGroup()

def actor2 = nonDaemonGroup.actor { … }

class MyActor { def MyActor() { this.parallelGroup = nonDaemonGroup }

void act() {...} }

Actors belonging to the same group share the underlying thread pool. With pooled actor groups you can split your actors to leverage multiple thread pools of different sizes and so assign resources to different components of your system and tune their performance.

def coreActors = new NonDaemonPGroup(5)  //5 non-daemon threads pool
def helperActors = new DefaultPGroup(1)  //1 daemon thread pool

def priceCalculator = coreActors.actor { … }

def paymentProcessor = coreActors.actor { … }

def emailNotifier = helperActors.actor { … }

def cleanupActor = helperActors.actor { … }

//increase size of the core actor group coreActors.resize 6

//shutdown the group's pool once you no longer need the group to release resources helperActors.shutdown()

Do not forget to shutdown custom pooled actor groups, once you no longer need them and their actors, to preserve system resources.

Common trap: App terminates while actors do not receive messages

Most likely you're using daemon threads and pools, which is the default setting, and your main thread finishes. Calling actor.join() on any, some or all of your actors would block the main thread until the actor terminates and thus keep all your actors running. Alternatively use instances of NonDaemonPGroup and assign some of your actors to these groups.

def nonDaemonGroup = new NonDaemonPGroup()
def myActor = nonDaemonGroup.actor {...}

alternatively

def nonDaemonGroup = new NonDaemonPGroup()

class MyActor extends DefaultActor { def MyActor() { this.parallelGroup = nonDaemonGroup }

void act() {...} }

def myActor = new MyActor()

Blocking Actors

Instead of event-driven continuation-styled actors, you may in some scenarios prefer using blocking actors. Blocking actors hold a single pooled thread for their whole life-time including the time when waiting for messages. They avoid some of the thread management overhead, since they never fight for threads after start, and also they let you write straight code without the necessity of continuation style, since they only do blocking messa

gipoco.com is neither affiliated with the authors of this page nor responsible for its contents. This is a safe-cache copy of the original web site.