Václav Pech : Weblog

Václav Pech

  • All
  • General
  • Java
  • Groovy
Main | Next page »
Thursday Dec 20, 2012

Broken promises

Today I want to elaborate on an important concept of asynchronous computing in general and a hot recent addition to GPars Promises - error handling for asynchronous operations.

Promises intro

Asynchronous tasks cannot return results directly. When you start an asynchronous calculation, all it can give you back is a placeholder for the future result - a Promise. Promises, sometimes referred to as Futures, represent a contract for a value that will become available sometime in the future, perhaps as a result of an asynchronous computational activity. The holder of the Promise can poll its current state, register one or more callbacks for the final value or block until the final value becomes available.

Promise<String> webPage = downloadPageAsynchronously("jroller.com/vaclav")
webPage.then {page -> saveToDisk(page)}                                            //register an async callback 
webPage.then {page -> log("Downloaded " + page.length + " characters")}            //register another async callback
println webPage.get()                                                              //synchronous (blocking) value retrieval

Promises introduce a level of indirection between the producer and the consumer. Neither the consumer nor the producer need to be known to one another. Whoever reads from the promise becomes a consumer, whoever writes a value to it becomes a producer. To provide reasonable concurrency guarantees, a value can only be assigned to a Promise once. After a value is assigned to a Promise, it becomes read-only.

Promise on promise on promise

We use promises quite heavily in GPars. Asynchronous tasks return Promises, asynchronous functions return Promises and active objects return, guess what, Promises. Thus the ability to combine and compose Promises became quickly pretty evident. The moment you allow Promises to be chained, their real power unleashes.

url.then(download).then(calculateHash).then(printResult);

Chained promises allow you to build non-trivial asynchronous computation chains and then fire them up by an asynchronous event. The computation will run in parallel with you other threads yet it gives you convenient ways to stay informed about what is going on.  

Keep your promises no matter what

Sunny-day scenarios are always attractive. However, what if any of the asynchronous calculations fails? What if somewhere in the middle of the chain an exception sneaks in? Shall it explode? Shall it hide? Taking inspiration from the excellent "You're missing the Point of Promises" article, GPars now propagates exceptions down the chain in a controlled way. Your code doesn't need to be polluted with exception handling yet you may handle the exception at any level you like. It will never be lost. No matter where in the chain the error happened, you handle it wherever it is most convenient to you:

url.then(download).then(calculateHash).then(formatResult).then(printResult, printError).then(sendNotificationEmail);

The then() chaining method accepts an optional second argument - an error handler, which only gets invoked when the promise is bound to an exception. The error handler may try to recover and return a replacement value to use further down the chain, it may throw a new exception or re-throw the caught one. If an error handler is missing, exception is simply forwarded to the next then() method in the chain bypassing the sunny-day scenario handler.

Be picky 

You may be also more specific about the handled exception type:

url.then(download)
    .then(calculateHash, {MalformedURLException e -> return 0}) //Only MalformedURLException is handled and replaced with 0, other exceptions just fall through
    .then(formatResult)
    .then(printResult, printError) //Handle all remaining exceptions
    .then(sendNotificationEmail);

In such cases, exceptions other than the declared type are treated as un-handled and are passed on (and perhaps handled somewhere down the stream).

Also, you may decide to leave exceptions completely un-handled and let the clients (consumers) handle them:

Promise<Object> result = url.then(download).then(calculateHash).then(formatResult).then(printResult);
try {
    result.get()}
catch (Exception e) {
    //handle exceptions here
}

Conclusion

With proper error handling promises become a versatile and an easy to use tool for wiring asynchronous calculations. If you want to add promises to your Groovy toolbox, get the recent GPars 1.0.0 and take it for a spin.

Posted at 06:13AM Dec 20, 2012 by Vaclav Pech in Groovy  |  Comments[1]

Wednesday Dec 19, 2012

I like the smell of a fresh, warm just baked release - GPars 1.0 arrived

I'm happy to announce that after four years of development GPars, the Groovy concurrency library, has just reached its 1.0 mark. A fresh and crispy GPars 1.0.0 is now ready for you to grab or download and use on your projects. Also, the up-coming Groovy releases will bundle GPars 1.0.

Compared to the previous release, 1.0 brings several performance enhancements, considerable API updates, polished documentation and numerous functionality improvements, mostly in the dataflow area. Please, check out the What's new section of the user guide for the details. Full release notes are also available.

I would like to use this opportunity to thank all the Groovy people, who have over time contributed in one way or another to the success of GPars. It is my honour to be part of such a helpful and encouraging community. In particular, I would like to thank my colleague GPars commiters, namely Paul King, Dierk Koenig, Alex Tkatchman and Russel Winder, who we've been consistently pushing the project forward and without whom it would hardly ever get this far. I also greatly appreciate the support we received from Guillaume Laforge, the Groovy supreme commander. Thank you all gentlemen!

Groovy concurrency times ahead!


Posted at 06:49AM Dec 19, 2012 by Vaclav Pech in Groovy  | 

Friday May 25, 2012

GPars actors and dataflow for Google AppEngine

Multi-threading is spreading - from desktops to laptops, to mobile devices and to hosted services. So does spread the need for comfortable and robust concurrency programming abstractions to leverage multithreading in an intuitive and safe way. Some time ago Google has enabled multi-threading for Java applications on their Google AppEngine. You can get up to ten threads and then use the standard Java means for synchronization and locking to manage them. This obviously creates opportunity for higher-level concurrency abstractions to be built on top.

This is where I have some good news for you - if you need easy-to-use concurrency on the Google AppEngine, GPars can now give you a hand. GPars actors, dataflow, agents as well as parallel collections can be smoothly integrated into your Java, Groovy and Gaelyk GAE applications and so you gain several intuitive ways to split your computations among multiple GAE threads. To try a few attractive samples, please visit gparsconsole.appspot.com and play around. The sky and the free quota is the only limit to your concurrent creativity.

I'd like to thank Vladimír Oraný (@musketyr) from AppSatori for the initiative and effort. The first iteration of the integration library we created is available at GitHub as well as in the maven repository:

<dependency>
            <groupId>org.codehaus.gpars</groupId>
            <artifactId>gpars-appengine</artifactId>
            <version>0.1-SNAPSHOT</version>
</dependency>
Don't forget to share your thoughts and please spread the word!

Groovy concurrency times ahead ...

Posted at 01:30PM May 25, 2012 by Vaclav Pech in Groovy  |  Comments[1]

Tuesday Dec 20, 2011

The Promises to trust

Task decomposition is one of the most intuitive ways to introduce concurrency. You define several independent tasks/processes/threads, split all the work so that each task gets its share and then just let them run concurrently. In GPars we provide several techniques to start asynchronous activities - Dataflow tasks, asynchronous functions or active objects to name the most “promising” (aka using Promises) concepts.

Now, when you have the calculations running, you urgently need ways to coordinate them, to monitor them and eventually also to retrieve and combine their results. This is what the concept of Promises solves quite elegantly. You might have noticed that Promises are used in many concurrency-focused frameworks and languages - Akka, Clojure, Dart to name a few. GPars has been supporting Promises (implemented through DataflowVariables) since its early days, too.

At this point I'd like to point you to my earlier article introducing the underlying dataflow concepts.

What promises give you is the ability to pass around placeholders instead of real values. If you’re starting an asynchronous task that will eventually calculate a result, you need to hold something in your hands while the computation is running in the background. Something that gives you a way to poll the state of the task or retrieve the result, once it is known. A promise of a future value.

Gently handling results

When an asynchronous function, a task or an active object returns a Promise instead of a concrete value, the Promise you get represents a handle to the ongoing asynchronous calculation. You can now hold the service by its promise and call get() to block until a value representing the result of the calculation is available.

Promise bookingPromise = task {
    final data = collectData()
return broker.makeBooking(data) }
//...some time later
printAgenda bookingPromise.get()

If promises look familiar, it is most probably because you’ve met java.util.concurrent.Future before. Indeed, Futures and Promises stand close to each other. Yet, there’s one fundamental difference between Promises as we understand them in GPars and the Future class in Java. Promises allow you to wait for the result without blocking the current thread - you just need to register a handler that will be invoked once the promised value is available. Obviously, nothing prevents you from having more of such handlers for a single promise: They will all trigger in parallel once the promise has a concrete value:

Promise bookingPromise = task {
    final data = collectData()
    return broker.makeBooking(data)
}
bookingPromise.whenBound {booking -> printAgenda booking}
bookingPromise.whenBound {booking -> sendMeAnEmail booking}
bookingPromise.whenBound {booking -> updateTheCalendar booking}

I tend to think that this is more than just a minor improvement. It opens a wide range of new possibilities, like promise chaining, grouping and composition. Bound handlers help you write highly concurrent applications, which are nice to the system and never ask for more threads than there are active calculations. System threads rarely get blocked and moved into the passive/parked state. Instead, they get reused as different handlers become eligible for running.

The chain of responsibility

When I saw a presentation of the Dart programming language recently, one thing that impressed me was their use of asynchronous operation chaining. Since Dart strongly emphasises asynchronicity, you can see Promises to be used all over the Dart code examples. Having so many asynchronous services to call, you frequently need to apply functions to the results of the previous asynchronous function. And being single-threaded you must not block when waiting for the promise to be fulfilled. This is where Dart offers promise chaining through their then() method.
We decided to enhance GPars and bring Promise chaining to our users in the upcoming version 1.0. Here’s a brief Groovy example to illustrate that functionality.

//Some asynchronous services to use
final polish = ...
final transform = ...
final save = ...
final notify = ...

Promise promiseForStuff = task { loadStuffFromDB() }
promiseForStuff.then polish then transform then save then {notify me}

A side note: Yes, this is all valid Groovy/GPars code. Groovy 1.8 lets you omit dots and parentheses in a lot of places.

With the then() method you can chain multiple asynchronous or synchronous services together in a way that they can run asynchronously from your main thread, automatically pass the result values from one to another in the chain and when waiting for their input they don’t consume threads at all. This way you can glue together asynchronous services without much effort and pain.

When all the promises come true

We can go futher that than. Chaining has limited applicability to cases, when you are waiting for multiple Promises, all of which need to have concrete values before you can move forward. In such situations the whenAllBound() function will be your close friend.

Promise module1 = task {
compile(module1Sources)
}
Promise module2 = task {
compile(module2Sources)
}
final jarCompiledModules = {List modules -> //zip up the modules into a jar}
whenAllBound([module1, module2], jarCompiledModules)

And sure you can start a new chain with the whenAllBound() operation.

whenAllBound([module1, module2], jarCompiledModules).then publishToMavenRepo then {println 'Done'}

Some functional topping

The functional guys among us have another alternative here - they just make the jarCompiledModules() function to be asynchronous and call it on the promises directly. Being a GPars asynchronous function, jarCompiledModules() will resolve the Promises itself without any further assistance from the client code:

final jarCompiledModules = {module1, module2 -> ...}.asyncFun()
jarCompiledModules(module1, module2)

And yet again, since asynchronous functions return Promises, jarCompiledModules() can lead a chain:

jarCompiledModules(module1, module2).then publishToMavenRepo then {println 'Done'}

Conclusion

As you can see there’s a lot of freedom in how asynchronous services can be tied together so that completion of some may trigger the calculation of others. And you are completely shielded away from the underlying thread pool and the actual thread scheduling strategy. Since we never block threads, a single-thread thread pool could well handle the whole calculation by itself, may we need so.
I can’t help it, but the future looks really promising ...

Posted at 10:13AM Dec 20, 2011 by Vaclav Pech in Groovy  |  Comments[1]

Friday Dec 02, 2011

Alternative Implementations for Parallel Game of Life

For those who enjoyed the dataflow example of parallel Game of Life that we discussed recently, we've prepared additional examples using actors as well as active objects and made them available in the GPars demo library. Check them out and share your thoughts. (You'll need GPars 0.12 or 1.0-SNAPSHOT to run them)
Can you think of other groovy ways to implement parallel GoL? Please, let me know.

Posted at 10:10AM Dec 02, 2011 by Vaclav Pech in Groovy  | 

Friday Sep 16, 2011

Let's stay in sync

Lots of interesting activities have been happenning around GPars dataflow recently. There's one particularly important and useful new feature that I'd like to tell you more about now - synchronous dataflow channels.

While asynchronous communication between dataflow components (tasks, operators) is a very reasonable default, at times it might come in handy to be able to tie communicating parties together more tightly. Typically, when producers generate data at a a faster pace than consumers are able to consume it, unconsumed messages start piling up in the asynchronous communication channels waiting for the consumers to take them. And then we start feeling an urgent need for some way of throttling the data production to limit the amount of memory occupied by all the work-in-progress data. We may, for example, let consumers send confirmation message back to producers. Or perhaps we start monitoring the size of the channels.
To give you a practical example, Dierk Koenig has experimented recently with Kanban-style limitation of work-in-Progress in dataflow channels, which I urge you to check out. Quoting Dierk, limiting work-in-progress in dataflow networks brings numerous useful benefits, such as automatic load balancing between producers and consumers or limiting the size of all queues in the system.

Additionally to using Kanban-style cards to manage asynchronous communication, for many scenarion we might prefer more straightforward solutions, which avoid the complexity of maintaining additional channels and control messages.

Let's be synchronous instead

And here it comes - synchronous channels will give us throttling (almost) for free. When synchronous, the channel will not only block all readers until there's a message available for them to read, but symmetrically, the writer will not be allowed to move forward before all the readers get the previous message. Taking inspiration fom the CSP (Communicating Sequential Processes) model, which you may know from GoLang or JCSP, GPars now provides synchronous channels for 1:1, N:1, 1:N, N:M communication.

Here's an example showing a fast producer sending data to a slow consumer. The synchronous nature of the SyncDataflowQueue channel will guarantee the producer will never get too far ahead of the sleepy consumer.

final SyncDataflowQueue channel = new SyncDataflowQueue()

def producer = task {
    (1..30).each {
        channel << it  //writing to a channel
        println "Just sent $it"
    }
}

def consumer = task {
    while (true) {
        sleep 500  //simulating a slow consumer
        final Object msg = channel.val
        println "Received $msg"
    }
}

producer.join()

Synchronous channels can be used everywhere where asynchronous channels can. If you've tried GPars dataflow already you can probably guess how important that is. You can combine synchronous channels with tasks, operators, splitters, selects and other dataflow components. Now, this gets GPars dataflow very close to the CSP concurrency model, doesn't it?

Operators can be synchronous, too 

To illustrate the characteristics of synchronous channels even better, the second code snippet illustrates using a SyncDataflowBroadcast channel to throttle communication among a fast producer operator and a group of consumers, one of which is slower than its peers. In this case all the operators in our group, the producer together with all the consumers, will make progress hand-in-hand at the pace of the slowest party. A good show of friendship and mutual respect among the operators.

final SyncDataflowBroadcast channel = new SyncDataflowBroadcast()

def subscription1 = channel.createReadChannel()
def fastConsumer = operator(inputs: [subscription1], outputs: []) {value ->
    sleep 10  //simulating a fast consumer
    println "Fast consumer received $value"
}

def subscription2 = channel.createReadChannel()
def slowConsumer = operator(inputs: [subscription2], outputs: []) {value ->
    sleep 500  //simulating a slow consumer
    println "Slow consumer received $value"
}

def producer = task {
    (1..30).each {
        println "Sending $it"
        channel << it  //writing to a channel
        println "Sent $it"
    }
}
producer.join()
[fastConsumer, slowConsumer]*.terminate()

Looking at the last example, we can see why synchronous communication may sometimes have negative impact on the overall performance. The producer as well as the faster of the two consumers have to wait for the slow consumer and cannot do any useful work in the meantime. They will all wait till all the parties get ready and then jump on the CPU at the same time. Well, we should take it as yet another example prooving that nothing in this world is for free. Not even communication throttling.

Conclusion

The upcoming GPars 1.0 release is bringing synchronous dataflow communication channels to allow for communication throttling among tasks and operators following the CSP communication style. Synchronous and asynchronous channels can be freely combined within the same algorithm giving you more options to write really effective concurrent Java or Groovy code.

Posted at 06:35AM Sep 16, 2011 by Vaclav Pech in Groovy  | 

Thursday Sep 01, 2011

Parallel Game of Life

Do you want to know more about dataflow concurrency? Here's a straightforward and intuitive example of dataflow operators solving one of the typical school programming exercises - the Game of Life. We are going to look at how to organize dataflow elements into a concurrent network that will simulate an evolution of a bacteria population. My aim is to give the reader a clear understanding of dataflow and the type of problems it can help you solve.

The Game of Life

The Game of Life (GoL) is probably the simplest computer simulation of biological systems you can imagine. I believe that most of programmers know what this is all about. In essence, you get a grid of cells representing the world. Some cells are empty, some are occupied by bacteria. The population of bacteria evolves in generations following a few simple rules. A bacteria may survive only if it has certain number of neighbors. Too few or too many neighbors will make a bacteria die. Similarly, a new bacteria may be born in an empty cell, provided it is surrounded by an appropriate number of living bacteria.

spacer

Now, this overview should be enough for you to follow me further. If you like, the GoL page at Wikipedia will give you all the relevant details.

Life is parallel

I still remember my first implementation of GoL written in Pascal back at school times. The world was represented by a two-dimensional array of boolean values and in each step of the simulation I iterated over the array in order to calculate the new generation, which then got stored in another two-dimensional array. Straightforward and correct. Purely sequential calculation. Calculating one cell after another.
While this was acceptable back then at single-core times, these days simulations should be able to leverage all the CPUs under the hood, shouldn't they?

Sharing the world

There's one obstacle we have to overcome when making GoL concurrent - shared mutable state. If multiple threads are to perform calculations in parallel, we have to ensure they cooperate nicely when accessing the world. The threads need to read data from several cells in order to decide whether a cell should remain empty/occupied or whether the state of the cell should change. And the calculated results should then be saved into a shared place so as we can get a consistent view of the new generation.

Out of the many possible concurrent solutions I picked dataflow operators, which fit the problem very well. After all, dataflow is, among other things, frequently used to calculate spreadsheets, which feels pretty close to what we need here.

What are those operators

Before we look into the architecture, I'd like to highlight a few things about the basic building blocks we'll be using - operators.
Operators are standalone active elements that transform data. They listen to several dataflow channels for input. Once all of the operator's input channels have a value to read, the operator triggers a calculation passing in the values - one value from each input channel. The result(s) of the calculation is then written to operator's output channel(s). It is the data that controls the calculation, not a process such as a for loop in my early sequential implementation.

spacer  

Until all the input channels have a value the operator just sits idle without consuming system threads. When the data triggers the operator, the operator takes a thread from a shared thread pool in order to run the calculation. After finishing the thread is returned back to the thread pool for other operators to use. This way, no matter how large our GoL world needs to be, we can tune the number of system threads that we request from the system to the capabilities of the hardware we run on.

Architecture

There are four essential components that form our concurrent implementation:

  • A two-dimensional array of dataflow channels to represent the current world.
  • A two-dimensional array of dataflow operators to calculate the new generation
  • A heartbeat channel broadcasting heartbeats to make all cells evolve in sync.
  • A value monitor painting the current state of the world onto the screen whenever the state changes.

spacer

Each cell, represented by a dataflow channel, broadcasts its current state - Either "I'm empty" or "I'm occupied". The monitor operator hears the messages and once it receives values from all the cells, it updates the screen.

The calculating operators, organized into a grid, are the most interesting part of the s

gipoco.com is neither affiliated with the authors of this page nor responsible for its contents. This is a safe-cache copy of the original web site.