Thursday, June 5, 2014

Java Concurrency: Learning from Node.js, Scala, Dart and Go


Preface

Stuff below regarding asynchronism isn't exactly new, however in the Java-world you'll still see a lot of software using synchronous, blocking multi threading patterns, frequently built deep into the architecture of apps and frameworks. Because of the Java tradition to unify concurrency and parallelism, frequently even senior developers are not aware of the fact, one cannot build well scaling high throughput distributed systems using typical java synchronous multi-threading idioms.

So what is the problem with synchronous processing ?

Think of a web server handling user requests. In this example, to process a client request, multiple requests to another remote service have to be done.


Now, the service can process 10.000 requests per second, but request latency is 5ms (network time, de/encoding). So a simple request/response takes 5+5 ms (back and forth) = 10 ms. A single threaded client implemented in a blocking fashion can send/receive max. 100 requests per second, therefore is unable to saturate the service.
If the webserver is expected to emit on average 1 service requests per client request, its thread pool has to be sized to 100 worker threads in order to maximize throughput and saturate the service.

Now consider that the remote service makes use of another service with same throughput and latency characteristics.

This increases latency (as seen from the webserver) by another 10 ms, so overall latency of a service request/response is now 20ms in turn requiring a worker pool of 200 threads in the webserver, 100 service threads in service (A).

It is clear, in a complex distributed system this imposes a lot of problems when using thread based synchronous processing. A change in the infrastructure (=latency changes) might require reconfiguration of many servers/services.

A common solution to this dilemma is to just configure way too much worker threads in turn getting into multithreading related issues without need. Additionally, a thread is quite a heavy weight object. Its impossible to scale up to say 100.000 or more threads in case. The higher the latency, the more threads must be used to saturate a remote service (e.g. think of inter-datacenter connections), so if you need to process e.g. 50k requests per second and need to query a remote service for each incoming requst, you simply cannot use a synchronous, blocking style. Additionally, having more threads than CPU cores hampers overall performance (context switches), so having 100's of threads per CPU actually will reduce throughput significantly.

If you think of higher numbers (e.g. 100k requests per second), its clear that one cannot compensate for synchronous, blocking programming by adding more threads.

With synchronous IO programming, latency has direct impact on throughput


No problem, we can go asynchronous, we have threads ..

(note: sleep is a placeholder for some blocking IO or blocking processing)
Bang ! because:
  • HashMap must be changed to ConcurrentHashMap, else you might run into endless loop when calling "get" concurrenlty to a "put" operation.
  • You might see or not the result of "someInt++" (JMM). Needs to be declared 'volatile'.
Congrats ! Now most of your code is multithreaded, which means you'll clutter your code with ConcurrentHashMap's and Atomics, synchronized and deadlocks ... the step-up of "callback hell" is "multithreaded callback hell".

With plain java, asynchronous programming provokes unnecessary multithreading.

A thread-safe version of this code would manage to deliver the callback "inside" the calling thread. But for that we need to enqueue the callback and need kind of a threading framework which ensures the queue is read and executed.

Well, one can do this in plain java like this:


Hm, some boilerplate code here, basically a handcrafted simple implementation of an Actor. Actually its a very simple case, as the async call does not return values. In order to realize this, more handcrafting would be required (e.g. a callback interface to deliver the result of blocking code to the calling thread, additionally you won't do the result-callback from your one-and-only-unblockable mailbox processor thread. Tricky stuff ahead.

With an actor library, this stuff is solved inside the actor framework. Same code with kontraktor:


(I left some comments as well, to avoid dillusion of the fact that only 2-3 lines of extra code are required compared to a synchronous implementation).

Reminder: Bad Threads

Using multi threaded asynchronous processing easily leads to massive multi threading, which raises a bunch of problems:
  1. Out-of-Control Threading: its hard to tell which parts of your application data/resources are  accessed concurrently as software grows (an innocent call to your application framework from a within a foreign callback thread can literally make everything multithreaded unnoticed).
  2. Because of (1) developers start synchronizing defensively
  3. Because of (2) you get deadlocks
  4. Because of (2) you run 500 Threads all waiting on contended locks, so your service goes down despite having just 60% CPU usage.
  5. Because of (1) you get spurious unreproducable errors caused by random cache coherency/data visibility issues.
Anyway, lets have a look what newer languages bring to the table regarding concurrency ..



Concurrency Idioms/Concepts in Go, Dart, Node.js, Scala

Despite the hype and discussions surrounding those emerging languages, they technically present more or less the same basic concept to deal with concurrency (ofc there are lots of differences in detail, especially Go is different):
Single threaded entities (with associated thread-local state) sequentially process incoming events from bounded or unbounded queue's.
This technical foundation can be presented in different ways to a programmer. With actors, functionality and data is grouped 'around' the queue ('mailbox'). In the channel model, larger entities ('Process', 'Isolate') explicitely open/close 'channels' (=queues=mailboxes) to exchange messages with other processing entities.


node.js/javascript

A node.js process is a single actor instance. Instead of a 'mailbox' the queue in node.js is called "event queue". Blocking operations are externalized to the javascript V8 VM, the results of such an asynchronous operation is then put onto the event queue. The programming model is callback/future style however there are extensions providing a fiber like (software threads, "green" threads) programming model.


Multicore hardware is saturated by running several instances of node.js.


Dart 

(on server side) features a more sophisticated, but similar model compared to JS. In contradiction to javascript, Dart provides a richer set of utilities to deal with concurrency.
Its possible to run several event loops called 'Isolates' from within a single process or in separate processes. Conceptually, each isolate is run by a separate thread. Isolates communicate by putting messages onto each others event loop. Very actor'ish, in fact this can be seen as an implementation of the Actor-pattern.
As a special, a Dart Actor Isolate reads from 2 event queues, of which one has higher priority and (simplified) processes self-induced asynchronous code, while the other one gets events from the 'outside' world.

Api-wise, Dart favours 'Futures' over 'Callbacks':

Ordering in Dart

ordered execution of asynchronous pieces of code then can be chained using 'then'.

For messaging amongs Isolates, Dart provides generic deep-copy of arbitrary object graphs in-process, and simple datatypes for communication amongst isolates running in different processes.

Both Dart and javascript actually use a flavour of the Actor pattern, though they use a different terminology.


Scala/Akka

Although Scala has access to the same concurrency primitives as Java (runs on the JVM), Scala has a tradition of asynchronism, which is reflected by existence of many async tools, idioms and libraries. Its a very creative, adventurous and innovative platform, which can be both good or bad in industrial-grade software projects.
Ordering in Akka
As to be expected, Scala's Akka+async libraries are very rich featured. Akka proposes a somewhat different system-design phliosophy compared to Dart/Nodes.js as they encourage a more fine grained actor design.
Akka promotes an untyped actor model (though typed actors are available) and has features such as actor-induced message reordering/dropping, re-dispatching, scanning the mailbox, etc. As always: great flexibility also paves the way for abusive and hard to maintain programming patterns. Splitting up an application into many actors (fine grained actor design) is advantageous in that a sophisticated scheduler may parallelize execution better. On the other hand this comes at a high cost regarding maintainability and controllability. Additionally, enqueuing a message instead of doing a plain method call has high fixed overhead, so its questionable wether doing fine grained actor design pays off.

As Akka originally was designed for Scala, it requires quite some boiler-plating when used from java. One either has to write immutable 'message classes' or (with typed actors) has to define an interface along with each actor (because typed actors use java.*.Proxy internally).

As I showed in a previous post, its not optimized to the bones, however this should not be significant for real world asynchronous IO/messaging centric applications.

Recent versions improved remoting of actors and also added support for multicast-based messaging (zeroMQ).

Go

If you look at typical processing sequences in an actor based asynchronous program, you'll probably notice its kind of a handcrafted multiplexing of concurrently executed statement sequences onto a single thread. I'll call such a sequence 'threadlet' (Edit: rest of the world calls them 'Monad').


Basically myFunc models an ordered sequence of actions ('threadlet'):

  • getUrl
  • process contents
  • return result

Since getting the URL needs some time, it is executed async, so other events can be read from the inbox.

If the runtime would be aware of "blocking calls" automatically, it could assist by providing the illusion of  single threaded execution by automatically splitting a "threadlet" into separate events. This is what Go does:
go myFunc() {
  String text = getUrl("http://google.com" );
  [process text]
  return result;
}()
While one builds up a "virtual stack" of a "threadlet" in an actor based system by chaining futures and callbacks, Go manages to transform a "normal" looking piece of code behind the scenes. For that it builds up a (dynamically sized) stack which replaces Future chaining of the actor based approaches. Its a non-native lightweight thread.

So this might look very different, its actually a similar concept of multiplexing small snippets of ordered statement sequences onto a single thread. Its similar to "Green Threads" (low overhead software emulated threads). If such a 'goroutine'/'threadlet' needs to wait or is logically blocked, the processing thread does not stop but processes another goroutine (in case of actors: message from mailbox).

As the Java, Javascript and Dart Virtual Machines have no concept of "coroutines" (=a runnable piece of code with associated stack and state = software thread = green thread) support, a more explicit syntax to emulate green threads is required for asynchronous programming in those lanugages.
The rationale behind go-routines/actors/future based asynchronism is, that the programmer should specify concurrency of execution. The paralellism/scheduling is  handled by the runtime system.
The Go runtime schedules go routines onto one or more OS threads. Blocking operations are automatically detected (e.g. OS calls), blocking operations such as waiting on messages from a channel/queue actually do not block as the current thread just continues with executing another go-routine.
So from an abstract point of view, go-routines are similar to lightweight threads with built-in awareness of non-blocking message-queues (channels).

Channels replace mailboxes,futures,callbacks

An actor encapsulates a queue (mailbox), a set of messages 'understood' and private (=thread local) data. In Go, mailboxes are replaced by queue objects called 'Channels' which can be created and deleted dynamically. A go routine might choose to open one or more queues (channels) it is listening and sending to. The size of a queue can be set to 0, which equals synchronous data transfer (actually still non blocking as a go-routine != a thread).

Shared State

As go-routines might run in parallel on different cores if the runtime scheduler decides so, there might be multithreaded access to shared data structures (if two go-routines are scheduled on different threads).
As far I understood, in those cases manual synchronization is required. I'd see this as a disadvantage compared to the actor model, where it is very clear wether private or shared data gets accessed. Additionally the concept of actor-local data matches current CPU designs, as each core has a non-shared cache.
The actor pattern enforces a thread-data-ownership by design. Astonishingly I did not find something along the lines of a GO memory model comparable to the JMM, which is required when access to shared data from multiple threads is possible.

See also Concurrency is not parallelism



Dart, Node.js have a shared nothing architecture

No data can be shared amongs processes/isolates. In order to share data, messaging must be used (=has to be encoded and copied). That's probably a contributing factor to the rise of no-sql storage like redis or mongodb. They are used both for persistance and to mimic a shared 'Heap'. In order to share complex data structures, kind of serialization has to be used. Frequently simple solutions, such as transmitting primitve types only or JSon are used, which adds quite some overhead to data sharing.

Other advantages of Shared Nothing
  • single threaded garbage collection
  • single threaded VM + JIT
  • No memory model like JMM required


Performance constraints

While single threaded concurrency has many advantages in IO centric, middle ware layer applications, it is a disadvantage when it comes to situations where several cores are used to speed up processing. E.g. it is not possible to implement something like the Disruptor without shared memory and access to native threading/volatile/barriers.

Why single threaded concurrency is easier than multithreaded concurrency (true parallelism)

Actor/CSP based concurrency still is challenging at times, however one does not have to deal with the issues arising from memory visibility rules resulting from modern CPU architectures (see JMM). So a synchronous sequence of statments inside an actor can do a simple HashMap put or increment a counter without even thinking of synchronization or concurrent modification.


Scheduling

As stated, actor (and asynchronous programming) splits up execution into smaller pieces of ordered statement sequences. Go does a similar thing with its go-routines.
A challenge to runtime-system implementations is the scheduling of ordered statement sequences ("threadlets") onto "real" OS threads.

The V8 VM (Node.js) schedules a single event loop (=mailbox) + some helper threads to "outsource" blocking operations. So the user application is run on a single thread.

The Dart VM provides a more refined system, having a "micro event loop" and an external event loop. Click here for more details .

Akka has a pluggable scheduling strategy: thread pool based, fork join (applies work-stealing), and pinned (each actor has its own thread). The queue implementation of the mailbox can be choosen, however there is no concept of several queues. Given that the "dual queues" are just an implementation of a "message priority" concept, it is clear similar concepts can be realized with Akka.

In my pet-library 'kontraktor', I use a dual queue approach similar to Dart (by chance). The programmer defines the mapping of actors to native (Dispatcher-) threads. This gives best results regarding cache-locality if done right. A downside is that there is no dynamic adaption in case an applications has varying load patterns.

Pure work-stealing algorithm might perform good in micro benchmarks, in real applications the penalty of cache misses when scheduling an actor on a different thread could be much higher compared to micro benchmark results.

In order to implement a self-optimizing dispatcher, a sophisticated runtime analysis is required (e.g. count cache misses of 'threadlets', track communication patterns amongst different actors) as outlined here. AFAIK no VM actually has implemented such sophisticated algorithms,  there is ongoing research in the Go community.

Further reading on GO's scheduler.


Do we all have to learn Scala, Go or Dart now ?

As Java has all the low level concurrency tools at hand, mostly a change in habit and mind-culture is required. A lot of Java APIs and components feature synchronous, blocking interfaces. Avoid them, they will become problematic in the uprising world of globally distributed systems.

In fact, JVM based languages are at advantage, because they have the freedom to choose or even mix different concurrency models.
One can implement Actors, Futures (nonblocking ones, not the blocking version like java.concurrent.Future) using the JVM's concurrency primitives. With the introduction of the shorthand syntax for anonymous classes (some call them lambdas) asynchronous code does not look as messy as with prior versions of java.

If I'd be a student, I'd go with Scala. This will also improve your general programming skills a lot and bring in a lot of interesting ideas and design patterns.
If you are in the middle of a java career and already heavily invested, its probably better to port/copy successful design patterns found in other, newer languages.

A groundbreaking improvement to Java's concurrency toolset would require VM and language improvements:

  • There has to be a way for the programmer to express concurrent/asynchronous execution, that differs from expressing parallel execution, so the JVM gets a chance to optimize execution of concurrent tasks. Currently, concurrency and parallelism are expressed using threads. 
  • Native support for continuations would be required (probably a very hard requirement, especially regarding HotSpot). There are some implementations using pure bytecode weaving out there (e.g. WebFlow) which unfortunately have pretty high performance costs.

Anyway, with recent language improvements (java 8) callback/futures style is much more manageable than before.
Another solution might come from hardware/OS. If threads could be realized in a much more lightweight way (+ faster context switch), software-level concurrency by multiplexing might not be required anymore.


Real world experience

We have transformed two processes of a very IO intensive clustered application from traditional multithreading to an actor based approach. Both of them performed an order of a magnitude faster thereafter. Additionally the code base got much more maintainable as the amount of shared data (data being accessed multithreaded) reduced dramatically. The number of threads declined from >300 to 2 (+some pooled helper threads).

What we changed:
  • Did not use threads to model concurrency. Concurrency != parallelism
  • We used non blocking IO
  • We used an actor approach (kontraktor). This way an ownership-relation is created inbetween an execution entity (actor) and datastructures.
  • We respected the short version of the "reactive manifest":


                                                     Never block