Thursday, June 5, 2014

Java Concurrency: Learning from Node.js, Scala, Dart and Go


Preface

Stuff below regarding asynchronism isn't exactly new, however in the Java-world you'll still see a lot of software using synchronous, blocking multi threading patterns, frequently built deep into the architecture of apps and frameworks. Because of the Java tradition to unify concurrency and parallelism, frequently even senior developers are not aware of the fact, one cannot build well scaling high throughput distributed systems using typical java synchronous multi-threading idioms.

So what is the problem with synchronous processing ?

Think of a web server handling user requests. In this example, to process a client request, multiple requests to another remote service have to be done.


Now, the service can process 10.000 requests per second, but request latency is 5ms (network time, de/encoding). So a simple request/response takes 5+5 ms (back and forth) = 10 ms. A single threaded client implemented in a blocking fashion can send/receive max. 100 requests per second, therefore is unable to saturate the service.
If the webserver is expected to emit on average 1 service requests per client request, its thread pool has to be sized to 100 worker threads in order to maximize throughput and saturate the service.

Now consider that the remote service makes use of another service with same throughput and latency characteristics.

This increases latency (as seen from the webserver) by another 10 ms, so overall latency of a service request/response is now 20ms in turn requiring a worker pool of 200 threads in the webserver, 100 service threads in service (A).

It is clear, in a complex distributed system this imposes a lot of problems when using thread based synchronous processing. A change in the infrastructure (=latency changes) might require reconfiguration of many servers/services.

A common solution to this dilemma is to just configure way too much worker threads in turn getting into multithreading related issues without need. Additionally, a thread is quite a heavy weight object. Its impossible to scale up to say 100.000 or more threads in case. The higher the latency, the more threads must be used to saturate a remote service (e.g. think of inter-datacenter connections), so if you need to process e.g. 50k requests per second and need to query a remote service for each incoming requst, you simply cannot use a synchronous, blocking style. Additionally, having more threads than CPU cores hampers overall performance (context switches), so having 100's of threads per CPU actually will reduce throughput significantly.

If you think of higher numbers (e.g. 100k requests per second), its clear that one cannot compensate for synchronous, blocking programming by adding more threads.

With synchronous IO programming, latency has direct impact on throughput


No problem, we can go asynchronous, we have threads ..

(note: sleep is a placeholder for some blocking IO or blocking processing)
Bang ! because:
  • HashMap must be changed to ConcurrentHashMap, else you might run into endless loop when calling "get" concurrenlty to a "put" operation.
  • You might see or not the result of "someInt++" (JMM). Needs to be declared 'volatile'.
Congrats ! Now most of your code is multithreaded, which means you'll clutter your code with ConcurrentHashMap's and Atomics, synchronized and deadlocks ... the step-up of "callback hell" is "multithreaded callback hell".

With plain java, asynchronous programming provokes unnecessary multithreading.

A thread-safe version of this code would manage to deliver the callback "inside" the calling thread. But for that we need to enqueue the callback and need kind of a threading framework which ensures the queue is read and executed.

Well, one can do this in plain java like this:


Hm, some boilerplate code here, basically a handcrafted simple implementation of an Actor. Actually its a very simple case, as the async call does not return values. In order to realize this, more handcrafting would be required (e.g. a callback interface to deliver the result of blocking code to the calling thread, additionally you won't do the result-callback from your one-and-only-unblockable mailbox processor thread. Tricky stuff ahead.

With an actor library, this stuff is solved inside the actor framework. Same code with kontraktor:


(I left some comments as well, to avoid dillusion of the fact that only 2-3 lines of extra code are required compared to a synchronous implementation).

Reminder: Bad Threads

Using multi threaded asynchronous processing easily leads to massive multi threading, which raises a bunch of problems:
  1. Out-of-Control Threading: its hard to tell which parts of your application data/resources are  accessed concurrently as software grows (an innocent call to your application framework from a within a foreign callback thread can literally make everything multithreaded unnoticed).
  2. Because of (1) developers start synchronizing defensively
  3. Because of (2) you get deadlocks
  4. Because of (2) you run 500 Threads all waiting on contended locks, so your service goes down despite having just 60% CPU usage.
  5. Because of (1) you get spurious unreproducable errors caused by random cache coherency/data visibility issues.
Anyway, lets have a look what newer languages bring to the table regarding concurrency ..



Concurrency Idioms/Concepts in Go, Dart, Node.js, Scala

Despite the hype and discussions surrounding those emerging languages, they technically present more or less the same basic concept to deal with concurrency (ofc there are lots of differences in detail, especially Go is different):
Single threaded entities (with associated thread-local state) sequentially process incoming events from bounded or unbounded queue's.
This technical foundation can be presented in different ways to a programmer. With actors, functionality and data is grouped 'around' the queue ('mailbox'). In the channel model, larger entities ('Process', 'Isolate') explicitely open/close 'channels' (=queues=mailboxes) to exchange messages with other processing entities.


node.js/javascript

A node.js process is a single actor instance. Instead of a 'mailbox' the queue in node.js is called "event queue". Blocking operations are externalized to the javascript V8 VM, the results of such an asynchronous operation is then put onto the event queue. The programming model is callback/future style however there are extensions providing a fiber like (software threads, "green" threads) programming model.


Multicore hardware is saturated by running several instances of node.js.


Dart 

(on server side) features a more sophisticated, but similar model compared to JS. In contradiction to javascript, Dart provides a richer set of utilities to deal with concurrency.
Its possible to run several event loops called 'Isolates' from within a single process or in separate processes. Conceptually, each isolate is run by a separate thread. Isolates communicate by putting messages onto each others event loop. Very actor'ish, in fact this can be seen as an implementation of the Actor-pattern.
As a special, a Dart Actor Isolate reads from 2 event queues, of which one has higher priority and (simplified) processes self-induced asynchronous code, while the other one gets events from the 'outside' world.

Api-wise, Dart favours 'Futures' over 'Callbacks':

Ordering in Dart

ordered execution of asynchronous pieces of code then can be chained using 'then'.

For messaging amongs Isolates, Dart provides generic deep-copy of arbitrary object graphs in-process, and simple datatypes for communication amongst isolates running in different processes.

Both Dart and javascript actually use a flavour of the Actor pattern, though they use a different terminology.


Scala/Akka

Although Scala has access to the same concurrency primitives as Java (runs on the JVM), Scala has a tradition of asynchronism, which is reflected by existence of many async tools, idioms and libraries. Its a very creative, adventurous and innovative platform, which can be both good or bad in industrial-grade software projects.
Ordering in Akka
As to be expected, Scala's Akka+async libraries are very rich featured. Akka proposes a somewhat different system-design phliosophy compared to Dart/Nodes.js as they encourage a more fine grained actor design.
Akka promotes an untyped actor model (though typed actors are available) and has features such as actor-induced message reordering/dropping, re-dispatching, scanning the mailbox, etc. As always: great flexibility also paves the way for abusive and hard to maintain programming patterns. Splitting up an application into many actors (fine grained actor design) is advantageous in that a sophisticated scheduler may parallelize execution better. On the other hand this comes at a high cost regarding maintainability and controllability. Additionally, enqueuing a message instead of doing a plain method call has high fixed overhead, so its questionable wether doing fine grained actor design pays off.

As Akka originally was designed for Scala, it requires quite some boiler-plating when used from java. One either has to write immutable 'message classes' or (with typed actors) has to define an interface along with each actor (because typed actors use java.*.Proxy internally).

As I showed in a previous post, its not optimized to the bones, however this should not be significant for real world asynchronous IO/messaging centric applications.

Recent versions improved remoting of actors and also added support for multicast-based messaging (zeroMQ).

Go

If you look at typical processing sequences in an actor based asynchronous program, you'll probably notice its kind of a handcrafted multiplexing of concurrently executed statement sequences onto a single thread. I'll call such a sequence 'threadlet'.


Basically myFunc models an ordered sequence of actions ('threadlet'):

  • getUrl
  • process contents
  • return result

Since getting the URL needs some time, it is executed async, so other events can be read from the inbox.

If the runtime would be aware of "blocking calls" automatically, it could assist by providing the illusion of  single threaded execution by automatically splitting a "threadlet" into separate events. This is what Go does:
go myFunc() {
  String text = getUrl("http://google.com" );
  [process text]
  return result;
}()
While one builds up a "virtual stack" of a "threadlet" in an actor based system by chaining futures and callbacks, Go manages to transform a "normal" looking piece of code behind the scenes. For that it builds up a (dynamically sized) stack which replaces Future chaining of the actor based approaches. Its a non-native lightweight thread.

So this might look very different, its actually a similar concept of multiplexing small snippets of ordered statement sequences onto a single thread. Its similar to "Green Threads" (low overhead software emulated threads). If such a 'goroutine'/'threadlet' needs to wait or is logically blocked, the processing thread does not stop but processes another goroutine (in case of actors: message from mailbox).

As the Java, Javascript and Dart Virtual Machines have no concept of "coroutines" (=a runnable piece of code with associated stack and state = software thread = green thread) support, a more explicit syntax to emulate green threads is required for asynchronous programming in those lanugages.
The rationale behind go-routines/actors/future based asynchronism is, that the programmer should specify concurrency of execution. The paralellism/scheduling is  handled by the runtime system.
The Go runtime schedules go routines onto one or more OS threads. Blocking operations are automatically detected (e.g. OS calls), blocking operations such as waiting on messages from a channel/queue actually do not block as the current thread just continues with executing another go-routine.
So from an abstract point of view, go-routines are similar to lightweight threads with built-in awareness of non-blocking message-queues (channels).

Channels replace mailboxes,futures,callbacks

An actor encapsulates a queue (mailbox), a set of messages 'understood' and private (=thread local) data. In Go, mailboxes are replaced by queue objects called 'Channels' which can be created and deleted dynamically. A go routine might choose to open one or more queues (channels) it is listening and sending to. The size of a queue can be set to 0, which equals synchronous data transfer (actually still non blocking as a go-routine != a thread).

Shared State

As go-routines might run in parallel on different cores if the runtime scheduler decides so, there might be multithreaded access to shared data structures (if two go-routines are scheduled on different threads).
As far I understood, in those cases manual synchronization is required. I'd see this as a disadvantage compared to the actor model, where it is very clear wether private or shared data gets accessed. Additionally the concept of actor-local data matches current CPU designs, as each core has a non-shared cache.
The actor pattern enforces a thread-data-ownership by design. Astonishingly I did not find something along the lines of a GO memory model comparable to the JMM, which is required when access to shared data from multiple threads is possible.

See also Concurrency is not parallelism



Dart, Node.js have a shared nothing architecture

No data can be shared amongs processes/isolates. In order to share data, messaging must be used (=has to be encoded and copied). That's probably a contributing factor to the rise of no-sql storage like redis or mongodb. They are used both for persistance and to mimic a shared 'Heap'. In order to share complex data structures, kind of serialization has to be used. Frequently simple solutions, such as transmitting primitve types only or JSon are used, which adds quite some overhead to data sharing.

Other advantages of Shared Nothing
  • single threaded garbage collection
  • single threaded VM + JIT
  • No memory model like JMM required


Performance constraints

While single threaded concurrency has many advantages in IO centric, middle ware layer applications, it is a disadvantage when it comes to situations where several cores are used to speed up processing. E.g. it is not possible to implement something like the Disruptor without shared memory and access to native threading/volatile/barriers.

Why single threaded concurrency is easier than multithreaded concurrency (true parallelism)

Actor/CSP based concurrency still is challenging at times, however one does not have to deal with the issues arising from memory visibility rules resulting from modern CPU architectures (see JMM). So a synchronous sequence of statments inside an actor can do a simple HashMap put or increment a counter without even thinking of synchronization or concurrent modification.


Scheduling

As stated, actor (and asynchronous programming) splits up execution into smaller pieces of ordered statement sequences. Go does a similar thing with its go-routines.
A challenge to runtime-system implementations is the scheduling of ordered statement sequences ("threadlets") onto "real" OS threads.

The V8 VM (Node.js) schedules a single event loop (=mailbox) + some helper threads to "outsource" blocking operations. So the user application is run on a single thread.

The Dart VM provides a more refined system, having a "micro event loop" and an external event loop. Click here for more details .

Akka has a pluggable scheduling strategy: thread pool based, fork join (applies work-stealing), and pinned (each actor has its own thread). The queue implementation of the mailbox can be choosen, however there is no concept of several queues. Given that the "dual queues" are just an implementation of a "message priority" concept, it is clear similar concepts can be realized with Akka.

In my pet-library 'kontraktor', I use a dual queue approach similar to Dart (by chance). The programmer defines the mapping of actors to native (Dispatcher-) threads. This gives best results regarding cache-locality if done right. A downside is that there is no dynamic adaption in case an applications has varying load patterns.

Pure work-stealing algorithm might perform good in micro benchmarks, in real applications the penalty of cache misses when scheduling an actor on a different thread could be much higher compared to micro benchmark results.

In order to implement a self-optimizing dispatcher, a sophisticated runtime analysis is required (e.g. count cache misses of 'threadlets', track communication patterns amongst different actors) as outlined here. AFAIK no VM actually has implemented such sophisticated algorithms,  there is ongoing research in the Go community.

Further reading on GO's scheduler.


Do we all have to learn Scala, Go or Dart now ?

As Java has all the low level concurrency tools at hand, mostly a change in habit and mind-culture is required. A lot of Java APIs and components feature synchronous, blocking interfaces. Avoid them, they will become problematic in the uprising world of globally distributed systems.

In fact, JVM based languages are at advantage, because they have the freedom to choose or even mix different concurrency models.
One can implement Actors, Futures (nonblocking ones, not the blocking version like java.concurrent.Future) using the JVM's concurrency primitives. With the introduction of the shorthand syntax for anonymous classes (some call them lambdas) asynchronous code does not look as messy as with prior versions of java.

If I'd be a student, I'd go with Scala. This will also improve your general programming skills a lot and bring in a lot of interesting ideas and design patterns.
If you are in the middle of a java career and already heavily invested, its probably better to port/copy successful design patterns found in other, newer languages.

A groundbreaking improvement to Java's concurrency toolset would require VM and language improvements:

  • There has to be a way for the programmer to express concurrent/asynchronous execution, that differs from expressing parallel execution, so the JVM gets a chance to optimize execution of concurrent tasks. Currently, concurrency and parallelism are expressed using threads. 
  • Native support for continuations would be required (probably a very hard requirement, especially regarding HotSpot). There are some implementations using pure bytecode weaving out there (e.g. WebFlow) which unfortunately have pretty high performance costs.

Anyway, with recent language improvements (java 8) callback/futures style is much more manageable than before.
Another solution might come from hardware/OS. If threads could be realized in a much more lightweight way (+ faster context switch), software-level concurrency by multiplexing might not be required anymore.


Real world experience

We have transformed two processes of a very IO intensive clustered application from traditional multithreading to an actor based approach. Both of them performed an order of a magnitude faster thereafter. Additionally the code base got much more maintainable as the amount of shared data (data being accessed multithreaded) reduced dramatically. The number of threads declined from >300 to 2 (+some pooled helper threads).

What we changed:
  • Did not use threads to model concurrency. Concurrency != parallelism
  • We used non blocking IO
  • We used an actor approach (kontraktor). This way an ownership-relation is created inbetween an execution entity (actor) and datastructures.
  • We respected the short version of the "reactive manifest":


                                                     Never block





Tuesday, January 28, 2014

Comparision of different concurrency models: Actors, CSP, Disruptor and Threads

In order to make use of modern multi core/socket hardware, Java offers threads and locks for concurrent programming. It is well known this model suffers of a number of problems:
  • Deadlocks
  • Bad scaling due to "over-synchronization" or contention on common resources
  • Errors caused by invalid synchronization are timing dependent and hard to reproduce. It is common they appear under special conditions (load, hardware) in production.
  • As a software project grows, it gets hard to safely modify the application. A programmer needs global awareness of an applications control flow and data access patterns in order to change the software without introducing errors.
  • The subtleties of the Java Memory Model are not well known to most Java programmers. Even if JMM is known, incorrect programming isn't obvious and will fail rarely. It can be a nightmare to spot them from production logs of large applications.
So I am looking for alternative models to express concurrency. Therefore a simple benchmark of the most popular "alternative" concurrency models is done: Actors, CSP and Disruptor.



Actors


Real Life Analogy: People cooperating by sending mails to each other.

An Actor is like an object instance executed by a single thread. Instead of direct calls to methods, messages are put into the Actors "mailbox" (~queue). The actor single threaded reads and processes messages from the queue sequentially (with exceptions).
Internal state is exposed/shared by passing messages (with copied state) to other Actors.







CSP (Communicating Sequential Processes)


Real Life Analogy: People phoning each other.

Though a different terminology is used, CSP systems can be seen as a special actor system having bounded mailboxes (queues) of size 0.

So if one process (~Actor) wants to pass a message to another process, the caller is blocked until the receiver accepts the message. Alternatively to being blocked, a CSP-process can choose to do other things e.g. check for incoming messages (this introduces  non determinism to the order of outgoing messages). A receiver cannot accept incoming messages if he is occupied with processing a prior one.





Disruptor


The disruptor data structure came up some years ago and was invented and pioneered by Martin Thompson, Mike Barker, and Dave Farley at LMAX exchange.

Real Life Analogy: Assembly line




The disruptor is a bounded queue (implemented by a ring buffer) where producers add to the head of the queue (if slots are available, else the producer is blocked).
Consumers access the queue at different points, so each consumer has its own read position (cursor) inside the queue. Sequencing is used to manage queue access and processing order.



Thread + locking of shared data


contention
Ok, everybody knows this. Its Java's default to model concurrent execution. Actually these are the primitives (+CAS) used to build higher level concurrent models like those described above.











Conclusion


While the CSP/Actor-Model realizes inter-thread communication by passing/copying thread-local data to some queue or process, the Disruptor keeps data in place and assures only one thread (consumer or producer) is owner of a data item (=queue slot) at a time.
This model seems to fit existing CPU, memory and VM architectures better resulting in high throughput and superior performance. It avoids high allocation rates and reduces the probability of cache misses. Additionally it implicitly balances processing speed of interdependent consumers without growing queues somewhere.
There is no silver bullet for concurrent programming, one still has to plan carefully and needs to know what's going on.

While I am writing this, I realize each of these patterns has its set of best-fit problem domains, so my initial intention using a single benchmark to compare performance of different concurrency models might be somewhat off :-).


 

The Benchmark Application


 

The benchmark approximates the value of PI concurrently. Therefore N jobs are created, and the result of each job must be added to finally get an approximation of PI. For maximum performance one would create one large job for each Core of the CPU used. With this setup all solutions roughly would perform the same as there is no significant concurrent access and scheduling overhead.

However if we compute PI by creating 1 million jobs, each computing a 0..100-loop slice of PI, we get an impression of:
  • cost of accessing shared data when the result of each job is added to one single result value
  • overhead created by the concurrency control mechanics (e.g. sending messages/scheduling Runnables to a thread pool, CAS, Locks, volatile r/w ..).
The test is run with 2 configurations
  • 100 iterations per pi-slice-job, 1,000,000 jobs
  • 1,000 iterations per pi-slice-job, 100,000 jobs
As hardware I use
  • AMD Opteron 6274 Dual Socket 2,2 Ghz. Each socket has 8 cores + 8 Hardware Threads (so overall 16 cores + 16 HW Threads)
  • Intel XEON@3Ghz dual socket six core (12 cores + Hyperthreading turned off). (2011 release)
I never use more threads as there are "real" cores.



Stop the blabber, gimme results ! 


See bottom of this post for the benchmark source.
  • Threads - somewhat naive thread based implementation of the Pi computation. Enough effort invested it is possible to match any of the other results of course. At the core of the VM, threads, locks (+atomic, volatile r/w + CAS) are the only concurrent primitives. However there is no point in creating an ad-hoc copy of the Disruptor or an Actor system in order to compare concurrency approaches.
  • Akka - a popular Actor implementation on the VM. The benchmark has been reviewed and revised (especially the ActorSystem configuration can make a big difference) by the Akka crew. Threads are scheduled using Java 7's fork join pool. Actually the Pi computation is one of Akka's tutorial examples. 
  • Abstraktor - my experimental Actor/CSP implementation. It's using short bounded queues (so leans more to the CSP side) and avoids deadlocks by maintaining 2 queues per Actor (in and out). If the out-queue is blocked, it just reads from the in-queue.
    I am using Nitsan Wakarts excellent MPSC queue implementation (check his blog or github jaq-in-a-box) and that's the major reason it shows kind of competitive performance+scaling.
    I use this to get a rough baseline for comparision and experiment with different flavours of Actors/CSP. Probably the only thing one can do with it is to run the Pi bench ;-).
    Update: The experimental version benchmarked here has been consolidated + improved. You can find it ohttps://github.com/RuedigerMoeller/kontraktor
  • Disruptor - my naive approach implementing the benchmark based on the Disruptor 3.2. It turned out that I used a not-yet-polished utility class, however I keep this benchmark just to illustrate how smallish differences in implementation may have big consequences.
  • Disruptor2 - As Michael Barker would have implemented it (Thanks :-) ). Its actually more than twice as fast for the 1 million test as the closest runner up.
Intel (Xeon 2 socket each 6 cores, Hyperthreading off)

Well, it seems with 100k jobs of 1000 iterations, the benchmark is dominated by computation, not concurrency control. Therefore I retry with 1 million jobs each computing a 100 iteration slice.

Ok, we probably can see differences here :-)



AMD Opteron 6274 Dual Socket 2,2 Ghz (=16 real cores, 16 HW Threads)


Again the 1 million tiny-job variant spreads the difference amongst the approaches (and their implementation):


Note there is like 5% run to run jitter (GC and stuff), however that does not change the big picture.

Last but not least: Best results per CPU architecture per lib:




Discussion of Results


We see that scaling behaviour can be significantly different depending on the hardware platform used.

Akka loses a lot of CPU time doing GC and allocation. If I modify Abstraktor to use unbounded Concurrent Linked Queue (Akka uses those), it performs similar to Akka and builds up temporary queues >100k elements. This is an inherent issue of the Actor model. By leaning towards CSP (use very short bounded blocking queues with size <100), performance also suffers, as threads are blocked/spinning for input too often. However with a queue size of 5000 elements, things work out pretty well (introducing other problems like deadlocks in case of cyclic Actor graphs).
The Disruptor library is very well implemented, so a good part of the performance advantage could be attributed to the excellent quality of implementation.

Cite from the Disruptor documentation regarding queuing:
3.5 The Problems of Queues
[...]  If an in-memory queue is allowed to be unbounded then for many classes of problem it can grow unchecked until it reaches the point of catastrophic failure by exhausting memory. This happens when producers outpace the consumers. Unbounded queues can be useful in systems where the producers are guaranteed not to outpace the consumers and memory is a precious resource, but there is always a risk if this assumption doesn’t hold and queue grows without limit. [...]
When in use, queues are typically always close to full or close to empty due to the differences in pace between consumers and producers. They very rarely operate in a balanced middle ground where the rate of production and consumption is evenly matched. [...]
Couldn't have said it better myself =).



Conclusion


Although the Disruptor worked best for this example, I think looking for "the concurrency model to go for" is wrong. If we look at the real world, we see all 4 patterns used dependent on use case.
So a broad concurrency library ideally would integrate the assembly-line pattern (~Disruptor), queued messaging (~Actors) and unqueued communication (~CSP).



Benchmark source


AKKA




Multi Threading




Abstraktor




Disruptor naive




Disruptor optimized


Saturday, December 21, 2013

Big Data the 'reactive' way

This has been posted (by me ofc) on the Java Advent Calendar originally. Check it  out for more interesting articles:

http://www.javaadvent.com/2013/12/big-data-reactive-way.html


A metatrend going on in the IT industry is a shift from query-based, batch oriented systems to (soft) realtime updated systems. While this is associated with financial trading only, there are many other examples such as "Just-In-Time"-logistic systems, flight companies doing realtime pricing of passenger seats based on demand and load, C2C auction system like EBay, real time traffic control and many more.

It is likely this trend will continue, as the (commercial) value of information is time dependent, value decreases with age of information.

Automated trading in the finance sector is just a forerunner in this area, because some microseconds time advantage can be worth millions of dollars. Its natural real time processing systems evolve in this domain faster.

However big parts of traditional IT infrastructure is not designed for reactive, event based systems. From query based databases to request-response based Http protcol, the common paradigm is to store and query data "when needed".

Current Databases are static and query-oriented


Current approaches to data management such as SQL and NOSQL databases focus on data transactions and static query of data. Databases provide convenience in slicing and dicing data but they do not support update of complex queries in real time. Uprising NOSQL databases still focus on computing a static result.
Databases are clearly not "reactive".

Current Messaging Products provide poor query/filtering options


Current messaging products are weak at filtering. Messages are separated into different streams (or topics), so clients can do a raw preselection on the data received. However this frequently means a client application receives like 10 times more data than needed, doing fine grained filtering 'on-top'.
A big disadvantage is, that the topic approach builts filter capabilities "into" the system's data design.
E.g. if a stock exchange system splits streams on a per-stock base, a client application still needs to subscribe to all streams in order to provide a dynamically updated list of "most active" stocks. Querying usually means "replay+search the complete message history".

A scalable, "continuous query" distributed Datagrid. 


I had the enjoyment to do conceptional & technical design for a large scale realtime system, so I'd like to share a generic scalable solution for continuous query processing at high volume and large scale.

It is common, that real-time processing systems are designed "event sourced". This means, persistence is replaced by journaling transactions. System state is kept in memory, the transaction journal is required for historic analysis and crash recovery only.
Client applications do not query, but listen to event streams instead. A common issue with event sourced systems is the problem of "late joining client". A late client would have to replay the whole system event journal in order to get an up-to-date snapshot of the system state.
In order to support late joining clients, a kind of "Last Value Cache" (LVC) component is required. The LVC holds current system state and allows late joiners to bootstrap by querying.
In a high performance, large data system, the LVC component becomes a bottleneck as the number of clients rises.

Generalizing the Last Value Cache: Continuous Queries


In a continuous query data cache, a query result is kept up to date automatically. Queries are replaced by subscriptions.
subscribe * from Orders where
   symbol in ['ALV', 'BMW'] and
   volume > 1000 and
   owner='MyCompany'
creates a message stream, which initially performs a query operation, after that updates the result set whenever a data change affecting the query result happened (transparent to the client application). The system ensures each subscriber receives exactly the change notifications necessary to keep its "live" query results up-to-date.


A distributed continous query system: The LVC Nodes hold data. Transactions are sent to them on a message bus (red). The LVC nodes compute the actual difference caused by a transaction and send change notifications on a message bus (blue).  This enables "processing nodes" to keep a mirror of their relevant data partition up-to-date. External clients connected via TCP/Http do not listen to the message bus (because multicast is not an option in WAN). "Subscription processors" keep the client's continuous queries up-to-date by listening to the (blue) message bus and dispatching required change notifications only to client's point2point connection.





















   
  

Difference of data access patterns compared to static data management:


  • High write volume
    Real time systems create a high volume of write access/change in data.
  • Fewer full table scans.
    Only late-joining clients or changes of a query's condition require a full data scan. Because continuous queries make "refreshing" a query result obsolete, Read/Write ratio is ~ 1:1 (if one counts the change notification resulting from a transaction as "Read Access").
  • The majority of load is generated, when evaluating queries of active continuous subscriptions with each change of data. Consider a transaction load of 100.000 changes per second with 10.000 active continuous queries: this requires 100.000*10.000 = 1 Billion evaluations of query conditions per second. That's still an underestimation: When a record gets updated, it must be tested whether the record has matched a query condition before the update and whether it matches after the update. A record's update may result in an add (because it matches after the change) or a remove transaction (because the record does not match anymore after a change) to a query subscription.

Data Cluster Nodes ("LastValueCache Nodes")

Data is organized in tables, column oriented. Each table's data is evenly partitioned amongst all data grid nodes (=last value cache node="LVC node"). By adding data nodes to the cluster, capacity is increased and snapshot queries (initializing a subscription) are sped up by increased concurrency.

There are three basic transactions/messages processed by the data grid nodes:

  • AddRow(table,newRow), 
  • RemoveRow(table,rowId), 
  • UpdateRow(table, rowId, diff). 

The data grid nodes provide a lambda-alike (row iterator) interface supporting the iteration of a table's rows  using plain java code. This can be used to perform map-reduce jobs and as a specialization, the initial query required by newly subscribing clients. Since ongoing computation of continuous queries is done in the "Gateway" nodes, the load of data nodes and the number of clients correlate weakly only.

All transactions processed by a data grid node are (re-)broadcasted using multicast "Change Notification" messages.

Gateway Nodes


Gateway nodes track subscriptions/connections to client applications. They listen to the global stream of change notifications and check whether a change influences the result of a continuous query (=subscription). This is very CPU intensive.

Two things make this work:

  1. by using plain java to define a query, query conditions profit from JIT compilation, no need to parse and interpret a query language. HotSpot is one of the best optimizing JIT compilers on the planet.
  2. Since multicast is used for the stream of global changes, one can add additional Gateway nodes with ~no impact on throughput of the cluster.

Processor (or Mutator) Nodes


These nodes implement logic on-top of the cluster data. E.g. a statistics processor does a continuous query for each table, incrementally counts the number of rows of each table and writes the results back to a "statistics" table, so a monitoring client application can subscribe to realtime data of current table sizes. Another example would be a "Matcher processor" in a stock exchange, listening to orders for a stock, if orders match, it removes them and adds a Trade to the "trades" table.
If one sees the whole cluster as kind of a "giant spreadsheet", processors implement the formulas of this spreadsheet.

Scaling Up


  • with data size:
    increase number of LVC nodes
  • Number of Clients
    increase subscription processor nodes.
  • TP/S
    scale up processor nodes and LVC nodes
Of cause the system relies heavily on availability of a "real" multicast messaging bus system. Any point to point oriented or broker-oriented networking/messaging will be a massive bottleneck.


Conclusion


Building real time processing software backed by a continuous query system simplifies application development a lot.
  • Its model-view-controller at large scale.
    Astonishing: patterns used in GUI applications for decades have not been extended regulary to the backing data storage systems.
  • Any server side processing can be partitioned in a natural way. A processor node creates an in-memory mirror of its data partition using continuous queries. Processing results are streamed back to the data grid. Computing intensive jobs, e.g. risk computation of derivatives can be scaled by adding processor instances subscribing to distinct partitions of the data ("sharding").
  • The size of the Code Base reduces significantly (both business logic and Front-End).
    A lot of code in handcrafted systems deals with keeping data up to date.

About me

I am a technical architect/senior developer consultant at an european company involved heavily in stock & derivative trading systems.


This post is part of the Java Advent Calendar and is licensed under the Creative Commons 3.0 Attribution license. If you like it, please spread the word by sharing, tweeting, FB, G+ and so on!

Sunday, December 15, 2013

Dart - a possible solution to the high.cost@lowquality.webapp syndrome

I am involved in development of high quality real time middle ware/GUI front-end applications professionally. Up to now, we were forced to use traditional Fat Client/Server designs, as Web Applications fail to fulfil basic usability requirements (e.g. Key Shortcuts) and performance requirement (high frequency real time updates). Basic UI features often require nasty and unmaintainable JavaScript hacks (the awkward, unfunful kind of hack) which blow up development+testing cost.

Regarding user experience/performance, checkout this example of a (good!) server centric framework:

(no offence, Apache-Wicket is one of the best Web-Frameworks out there IMO).
  • its sluggish
  • can't navigate with arrow keys
  • no standard multi selection (with Ctrl, Shift etc.)
if we go down the JavaScript route, we can do better:
http://www.jqwidgets.com/jquery-widgets-demo/demos/jqxgrid/index.htm?(arctic)#demos/jqxgrid/filtering.htm

However (having fiddled with JS myself) its suspicious: one can't resize any of the demo tables. Googling results in the following finding:

The Grid widget works with fixed width only which is specified by its ‘width’ property. It is not possible to set its width in percentages in this version. We’ll implement the requested feature for a future version. 
Best Regards, XXXX, DevTeam

So by the end of year 2013, it is still a problem to achieve basic UI functionality with current web standards and browsers. Even worse, it fails to respect basic software engineering principles such as DRY (don't repeat yourself), encapsulation, ..
(check out the "source" tab of the link above to get an impression of the mess required to create a simple table demo component).

This is not the fault of the libraries mentioned above, the root cause of this mess are the underlying half baked W3C "standards" (+ browser implementations). They are apparently defined by people never involved in real world application development.

So we sit there with an intact hype, but without a productive development stack to fulfil the vision. Since management tends to share the hype but does not validate technical feasibility, this led to some prominent mis-decisions such as Steve Jobs claiming WebApps can be used on the iPhone instead of native apps (first iPhone release had no native APIs), another example is facebook relying on html5 too much, now moving back to native phone apps hastily.

Following I'll explain why I think we urgently need the paradigm shift in Web-Applications. Without a fundamental change, industry will keep burning money trying to build WebApplications based on expensive , crappy, half-baked technology.

In the beginning there was REST

and it sucked right away ..

The use of the REST (Representational State Transfer) design pattern results in

  • the absence of state in a server application.
    Any durable client state gets passed with each request of a client. 
  • each user interaction (e.g. click a button) requires a complete html document served, transmitted and rendered. 
  • no concept of a "connection" at HTTP-protocol level,
    so authentication and security related stuff (e.g. SSL handshake) has to be repeated with each request/response (=change of GUI state). 

This allowed WebGUIs to max out on bandwidth inefficiency and latency. It also leads to usability nuggets by losing subtle GUI state (such as scrollbar position or the longish forum post you started to write).


JavaScript, Ajax and "Look ma, a menu in the browser"

Innovation (& browser wars) never sleep. Industry leading companies and consortium's figured out, that the best way to address fundamental design issues is to provide many many many workarounds on top of it.

Examples:
  • Http Keep alive to reduce latency
  • Standardize on an untyped weirdish scripting language,
    which can be cluttered all across the html page in tiny snippets. Ah .. there is nothing like mixing program, content and styling into a big code soup ..
  • Content Caching (man we had fun doing software updates ..)
    to avoid superfluous reload of static content
  • Define new Html Tags. A lot of them.
    The advantage of having many html tags is, that browsers have more room to implement them differently, allowing for the popular "if ( ie6 ) {...........} else if (mozilla) .. else if (safari) .." coding style. This way bandwidth requirements could be extended even more.
  • CSS was embraced,
    so when done with definition of new tags, they could continue with definition of new css attributes. Also javascript could mess up even better modifying the HTML DOM and CSS definitions at runtime.
  • Async Http Request (AJAX)
    avoids the need to reload whole html-page.
These are kludges (some of them with notable negative side effects) addressing fundamental issues of an overly simplistic foundation technology. So we got, what I call the MESS technology stack:

The MESS principles also infiltrated server side frameworks, as they had to deal with the MESS somehow. I'd speculate some of the frameworks are 90% "mess-handlers', designed to virtualize a solid engineered foundation in order to increase productivity when messing with the MESS.

Google to the rescue !!

One can safely assume apple and microsoft are not *that* interested in providing a fluid web application platform. If all applications would be WebApplications, the importance of the underlying operation system dwindles. All a user needs, is a compliant browser (hello Chrome OS).

Google as a major web application implementor/provider suffer them self from the high.cost@low-quality.webapp syndrome.

Some clever JavaScript hackers (<= positive) started to figure out a simplified, more flexible web application design. They basically implemented a "Fat Client" in JavaScript. Instead of doing page-reloading Http requests, they used Async Http to retrieve data from the server and rendered this data by modifying parts of the displayed html document (via DOM). Whenever you see a responsive 'cool' web application, you can safely assume it relies on this approach.

However JavaScript is not designed to work well at large business applications with many lines of code and fluctuation of development staff as typical for enterprise software development. It becomes a maintenance nightmare for sure (if the project succeeds at all) or a minimalistic solution hated by end-users.

The first widely known step in backing up the hype with a structured technology was the Google Widget Toolkit, which relied on compiling ("transpiling") java to browser compatible JavaScript.
For various reasons, this project has been stalled (but not dropped) by Google in favour of their new Dart language before the technology reached a 'mature' state. It has been reported that turn-around times get unacceptable with larger applications and its unlikely this will change (because resources have been moved to Dart). Remembering the law suit of Oracle against use of Java in Android, this move of Google is not too surprising.

Dart has the potential to reach a level of convenience which has been there for native apps for years:

  • extensive API to the browser. Anything achievable with JavaScript can be done with Dart.
  • a modern structured language with (optional) compile time type checking, a package concept and support for many well-known principles of object oriented programming like Classes, Interfaces (Mixins), Inheritance, Lambdas. The concurrency model of Dart (Isolates, similar to Actors) could be interesting on the server side also.
  • compilable to JavaScript
  • support for fully encapsulated tag-based UI components (via polymer lib and upcoming WebComponents w3c standard)
  • backed by a company providing the worlds leading browser and a believable self-dependency on the technology.

Conclusion

Mankind not only is capable to build chips with 800 million transistors, it also might be capable creating standards defined by 800 million words .. scary ;-) .

What about Java ?

While the Scala guys join the party late http://www.scala-lang.org/news/2013/11/29/announcing-scala-js-v0.1.html, Oracle misses the train and seems to favour JavaFX as a GUI platform (my prediction: Will fall asleep prior to reaching maturity).
GWT has been open sourced, so maybe the community will do further development. Given that there are more open source projects than developers willing to contribute, I doubt this will be able to keep pace with Google's money backed Dart. Also I could imagine the (aging) Java community shys away from (partially) deprecating their habitual server side framework landscape.

There is also http://qooxdoo.org/contrib/project/qwt/about/java backed by a bigger german hosting company. I have not tested this, but I doubt that pure community driven development will keep pace. Being compliant to ever changing draft specs and dealing with browser implementation variance is not fun ...

What's required is a byte-code to JavaScript transpiler. 
The browser API (DOM access) could be modeled by a set of interfaces at compile time and replaced by the transpiler with the appropriate javascript calls. 
Using static bytecode analysis or by tracking required classes at runtime, the set of required transpiled classes/methods could be found. Additionally one would need a fall-back transpiler at runtime. If a client side script instantiates an "un-transpiled" java class or method, it could be transpiled on-the-fly at server side and loaded to the client.
Nice thing of such a bytecode level transpiler would be, that all VM-based languages (groovy, scala, jruby, jphyton, ..) could profit from such a system. 
Example of a possible Java2JavaScript transpiling solution: Since Java is a wide spread server platform, a java2js transpiler could work dynamically, which is a major strategic advantage over Dart, which needs to use static analysis & compilation because most server systems are not running native dart.

There needs to be put real money on such a Java solution, with slow moving JCP we probably can expect something like this 2016 or later ...
Would be fun building such a transpiler system .. but no time - gotta mess with the MESS.

Update: The J2EE java 8 project seems to adapt to new 'thin' webapp architecture with project "Avatar". I still haven't figured out the details. On a first glance it looks like they bring JS to the server instead of bringing Java transpilation to the client .. which is a mad idea imho :-).

Update on GWT: A google engineer answered back on GWT telling me the project is alive and is in active development. It turns out Dart and GWT are handled by distinct (competing?) divisions of google. There still is evidence, that Dart receives the real money while GWT is handed over to the community. Google plans to bring native DartVM to chrome within ~1 year (rumours) and plans on delivering the DartVM to Android also.


Saturday, October 12, 2013

Still using Externalizable to get faster Serialization with Java ?


Update: When turning off detection of cycles, performance can be improved even further (FSTConfiguration.setUnshared(true)). However in this mode an object referenced twice is written twice.

According to popular belief, the only way to go is handcrafted implementation of the Externalizable interface in order to get fast Java Object serialization.

Manual coding of "readExternal" and "writeExternal" methods is an errorprone and boring task. Additionally, with each change of a class's fields, the externalizable methods need adaption.

In contradiction to popular belief, a good implementation of generic serialization can be faster than handcrafted implementation of the Externalizable interface 99% if not 100% of the time.

Fortunately I managed to save the world with the fast-serialization library by addressing the disadvantages of Serialization vs Externalizable.

The Benchmark

The following class will be benchmarked.
public class BlogBench implements Serializable {
    public BlogBench(int index) {
        // avoid benchmarking identity references instead of StringPerf
        str = "Some Value "+index;
        str1 = "Very Other Value "+index;
        switch (index%3) {
            case 0: str2 = "Default Value"; break;
            case 1: str2 = "Other Default Value"; break;
            case 2: str2 = "Non-Default Value "+index; break;
        }
    }
    private String str;
    private String str1;
    private String str2;
    private boolean b0 = true;
    private boolean b1 = false;
    private boolean b2 = true;
    private int test1 = 123456;
    private int test2 = 234234;
    private int test3 = 456456;
    private int test4 = -234234344;
    private int test5 = -1;
    private int test6 = 0;
    private long l1 = -38457359987788345l;
    private long l2 = 0l;
    private double d = 122.33;
}
To implement Externalizable, a copy of the class above is made but with Externalizable implementation
(source is here).
The main loop for fast-serialization is identical, I just replace "ObjectOutputStream" with "FSTObjectOutput" and "ObjectInputStream" with "FSTObjectInput".
Result:

  • Externalizable performance with JDK-Serialization is much better compared to Serializable
  • FST manages to serialize faster than manually written "read/writeExternal' implementation

Size is 319 bytes for JDK Serializable, 205 for JDK Externalizable, 160 for FST Serialization. Pretty big gain for a search/replace operation vs handcrafted coding ;-). BTW if the "Externalizable" class is serialized with FST it is still slightly slower than letting FST do generic serialization.

There is still room for improvement ..

The test class is rather small, so setup + allocation of the Input and Output streams take a significant part on the times measured. Fortunately FST provides mechanisms to reuse both FSTObjectInput and FSTObjectOutput. This yields ~200ns better read and write times.

So "new FSTObjectOutput(inputStream)" is replaced with
FSTConfiguration fstConf = FSTConfiguration.getDefaultConfiguration();
...
fstConf.getObjectOutput(bout)
There is even more improvement ..

Since Externalizable does not need to track references and this is not required for the test class, we turn off reference tracking for our sample by using the @Flat annotation. We can also make use of the fact, "str3" is most likely to contain a default value ..

@Flat
public class BlogBenchAnnotated  implements Serializable {
    public BlogBenchAnnotated(int index) {
        // avoid benchmarking identity references instead of StringPerf
        str = "Some Value "+index;
        str1 = "Very Other Value "+index;
        switch (index%3) {
            case 0: str2 = "Default Value"; break;
            case 1: str2 = "Other Default Value"; break;
            case 2: str2 = "Non-Default Value "+index; break;
        }
    }
    @Flat private String str;
    @Flat private String str1;
    @OneOf({"Default Value","Other Default Value"})
    @Flat private String str2;


and another one ..

To be able to instantiate the correct class at readtime, the classname must be transmitted. However in many cases both reader and writer know (at least most of) serialized classes at compile time. FST provides the possibility to register classes in advance, so only a number instead of a full classname is transmitted.
FSTConfiguration.getDefaultConfiguration().registerClass(BlogBenchAnnotated.class);



What's "Bulk"

Setup/Reuse of Streams actually require some nanoseconds, so by benchmarking just read/write of a tiny objects, a good part of per-object time is stream init. If an array of 10 BenchMark objects is written, per object time goes <300ns per object read/write.
Frequently an application will write more than one object into a single stream. For RPC encoding applications, a kind of "Batching" or just writing into the same stream calling "flush" after each object are able to actually get <300ns times in the real world. Of course Object Reference sharing must be turned off then (FSTConfiguration.setShared(false)).

For completeness: JDK (with manual Externalizable) Bulk yields 1197 nanos read and 378 nanos write, so it also profits from less initilaization. Unfortunately reuse of ObjectInput/OutputStream is not that easy to achieve mainly because ObjectOutputStream already writes some bytes into the underlying stream as it is instantiated.

Note that if (constant) initialization time is taken out of the benchmarks, the relative performance gains of FST are even higher (see benchmarks on fast serialization site).

Links:

Source of this Benchmark
FST Serialization Library (moved to github from gcode recently)



Wednesday, July 31, 2013

Impact of large primitive arrays (BLOBS) on Garbage Collection

While OldGen GC duration ("FullGC") depends on number of objects and the locality of their references to each other, young generation duration depends on the size of OldSpace memory. Alexej Ragozin has made extensive tests which give a good impression of young GC duration vs HeapSize here.

In order to avoid heavy impact of long lived data on OldGen GC, there are several workarounds/techniques.

  1. Put large parts of mostly static data Off-Heap using ByteBuffer.allocateDirect() or Unsafe.allocateMemory(). This memory is then used to store data (e.g. by using a fast serialization like http://code.google.com/p/fast-serialization/ [oops, I did it again] or specialized solutions like http://code.google.com/p/vanilla-java/wiki/HugeCollections ).
    Downside is, that one frequently has to implement a manual memory mangement on top.
  2. "instance saving" on heap by serializing into byte-arrays or transformation of datastructures. This usually involves using open adressed hashmaps without "Entry" Objects, large primitive arrays instead of small Objects like
    class ReferenceDataArray {
        int x[];
        double y[];
        long z[]; 
        public ReferenceDataArray(int size) {
             x = new int[size];
             y = new ...;
             z = ...;
        }
        public long getZ(int index) { return z[index]; }
    },
    replacement of generic collections with <Integer>, <Long> by specialized implementations with direct primitve int, long, ..
    If its worth to cripple your code this way is questionable, however the option exists.

Going the route outlined in (2) improves the effectivity of OldGen GC a lot. FullGC duration can be in the range of 2s even with heap sizes in the 8 GB area. CMS performs significantly better as it can scan OldSpace faster and therefore needs less headroom in order to avoid Full GC.

However there is still the fact, that YoungGen GC scales with OldSpace size.

The scaling effect is usually associated with "cardmarking". Young GC has to remember which areas of OldSpace have been modified (in such a way they reference objects in YoungGen). This is done with kind of a BitField where each bit (or byte) denotes the state of (modified/reference created or similar) a chunk ("card") of OldSpace.
Primitive Arrays basically are BLOBS for the VM, they cannot contain a reference to other Java Objects, so theoretically there is no need to scan or card-mark areas containing BLOBS them when doing GC. One could think e.g. of allocating large primitive arrays from top of oldspace, other objects from bottom this way reducing the amount of scanned cards.

Theory: blobs (primitive arrays) result in shorter young GC pauses then equal amount of heap allocated in smallish Objects.

Therefore I'd like to do a small test, measuring the effects of allocating large primitive arrays (such as byte[], int[], long[], double[]) on NewGen GC duration.

The Test

public class BlobTest {
    static ArrayList blobs = new ArrayList();
    static Object randomStuff[] = new Object[300000];
    public static void main( String arg[] ) {
        if ( Runtime.getRuntime().maxMemory() > 2*1024*1024*1024l) { // 'autodetect' avaiable blob space from mem settings
            int blobGB = (int) (Runtime.getRuntime().maxMemory()/(1024*1024*1024l));
            System.out.println("Allocating "+blobGB*32+" 32Mb blobs ... (="+blobGB+"Gb) ");
            for (int i = 0; i < blobGB*32; i++) {
                blobs.add(new byte[32*1024*1024]);
            }
            System.gc(); // force VM to adapt ..
        }
        // create eden collected tmps with a medium promotion rate (promotion rate can be adjusted by size of randomStuff[])
        while( true ) {
            randomStuff[((int) (Math.random() * randomStuff.length))] = new Rectangle();
        }
    }
}


The while loop at the bottom simulates the allocating application. Because I rewrite random indizes of the randomStuff arrays using a random index, a lot of temporary objects are created, because they same index is rewritten with another object instance. However because of random, some indices will not be hit in time and live longer, so they get promoted. The larger the array, the less likely index overwriting gets, the higher the promotion rate to OldSpace.

In order to avoid bias by VM-autoadjusting, I pin NewGen sizes, so the only variation is the allocation of large byte[] on top the allocation loop. (Note these settings are designed to encourage promotion, they are in now way optimal).


commandline:

java -Xms1g -Xmx1g -verbose:gc -XX:-UseAdaptiveSizePolicy -XX:SurvivorRatio=12 -XX:NewSize=100m -XX:MaxNewSize=100m -XX:MaxTenuringThreshold=2
by adding more GB the upper part of the test will use any heap above 1 GB to allocate byte[] arrays.

java -Xms3g -Xmx3g -verbose:gc -XX:-UseAdaptiveSizePolicy -XX:SurvivorRatio=12 -XX:NewSize=100m -XX:MaxNewSize=100m -XX:MaxTenuringThreshold=2
...
...
java -Xms11g -Xmx11g -verbose:gc -XX:-UseAdaptiveSizePolicy -XX:SurvivorRatio=12 -XX:NewSize=100m -XX:MaxNewSize=100m -XX:MaxTenuringThreshold=2

I am using byte[] arrays in the test, I verified int[], long[] behave exactly the same (must apply divisor then to adjust for larger size).


Results

(jdk 1.7_u21)






The 'Objects' test was done by replacing the static byte[] allocation loop in the benchmark by

            for ( int i = 0; i < blobGB*2700000; i++ )
                nonblobs.add(new Object[] {
                         new Rectangle(),new Rectangle(),new Rectangle(),
                         new Rectangle(),new Rectangle(),new Rectangle(),
                         new Rectangle(),new Rectangle(),new Rectangle(),
                         new Rectangle()});


Conclusion

Flattening data structures using on-heap allocated primitive arrays ('BLOBS') reduces OldGen GC overhead very effective. 
Young Gen pauses slightly reduce for CMS, so scaling with OldGen size is damped but not gone. For DefaultGC (PSYoung), minor pauses are actually slightly higher when the heap is filled with BLOBs.
I am not sure if the observed young gen duration variance has anything to do with "card marking" however i am satisfied quantifying effects of different allocation types and sizes :-) 

Further Improvement incoming ..


With this genious little optimization coming up in JDK 7_u40


card scanning of unmarked cards speeds up by a factor of 8. 

Additonally notice 

(for the test  -XX:+UnlockDiagnosticVMOptions -XX:ParGCCardsPerStrideChunk=512 gave the best results)
At least CMS Young Gen pause scaling is not too bad.

And G1 ?

G1 fails to execute the test. If one only allocates 6GB of byte[] with 11GB of heap, it still is much more disruptive than CMS. It works if I use small byte[] chunks of 1MB size and set page size to 32MB. Even then pauses are longer compared to CMS. G1 seems to have problems with large object arrays which will be problematic for IO intensive applications requiring big and many byte buffers.

Sunday, July 28, 2013

What drives Full GC duration ?

Its obvious that, the more memory allocated, the longer a full GC pause will be. However a GC has to track references, so I want to find out if primitve fields (like int, long) also increase Full GC duration. Additionally I'd like to find if the structure of an object graph has impact on GC duration.

The Test

I am creating a number of instances of the following classes:
    static class GCWrapper {
        GCWrapper(Object referenced) {
            this.referenced = referenced;
        }
        Object referenced;
    }
    static class GCWrapperMiddle {
        int a,b,c,d,e,f,g,h,i,j,k,l;
        GCWrapperMiddle(Object referenced) {
            this.referenced = referenced;
        }
        Object referenced;
    } 
    static class GCWrapperFat {
        int a,b,c,d,e,f,g,h,i,j,k,l;
        long aa,ab,ac,ad,ae,af,ag,ah,ai,jj,ak,al;
        GCWrapperFat(Object referenced) {
            this.referenced = referenced;
        }
        Object referenced;
    }
they all hold exactly one reference. But "Middle" and "Fat" additionally contain primitive fields, which increases their memory consumption a lot.
I am chaining those objects after creation to a linked list. For locality tests, the reference is pointed to a random other "GCWrapper" Object to create "far" references.
Additionally I am using the class below to test effects of "many references", a highly interlinked Object graph:
    static class GCWrapperLinked {
        GCWrapperLinked(Object referenced) {
            this.r1 = referenced;
        }
        Object r1,r2,r3,r4,r5;
    }
After instantiation, the test does 5 GC's using System.gc() and takes the last value (~3 calls are required until time stabilizes). 
Tests are done on an Intel i7 4 core 8 threads mobile processor with a somewhat outdated JDK 7 u 21, as I am in vacation currently with a pretty bad internet connection ...

What has more impact, Memory Size or Number of Object (-References) ?


The test was done with all collectors and different number of Objects on the Heap. Objects were allocated in  a linked List like
        GCWrapperMiddle res = new GCWrapperMiddle(null);
        for ( int i = 0; i < len; i++ ) {
            res = new GCWrapperMiddle(res);
        }
        return res;

The result shows clear linear dependency inbetween the number of Objects and Full GC duration.
Please don't conclude Default GC (ParOldGC) is slowest from the results, because
a) subsequent calls to system.gc() without mutation may enable Collectors to cut corners
b) they might choose to use fewer/more threads during collection

I test all Collectors just to ensure that observed interdependencies are present for all collectors.

Now let's compare effects of Object size on Full GC durations. As described above, I pumped up object size by adding int and long fields to them. Those fields cannot contain references to other objects (no need to scan them) but still consume space. The test creates 6 million objects of the classes shown above. Heap consumption is:

  • 163 Mb for 'small'
  • 427 Mb for 'middle'
  • 1003 Mb for 'large'
Now that's interesting:
Object size has a minor impact on Full GC duration (the differences observed can be attributed mostly to CPU cache misses, i assume). Main driver of FullGC duration is the number of object (-references). 


Does GC duration depend on number of objects or the number of object-references ?


To test this, I create an object graph where each gcwrapper object points to another randomly choosen gcwrapper object.
        GCWrapper res[] = new GCWrapper[len];
        for ( int i = 0; i < len; i++ ) {
            res[i] = new GCWrapper(null);
        }
        for ( int i = 0; i < len; i++ ) {
            res[i].referenced = res[((int) (Math.random() * len))];
        }
        return res;
and ('5 refs')
        GCWrapperLinked res[] = new GCWrapperLinked[len];
        for (int i = 0; i < res.length; i++) {
            res[i] = new GCWrapperLinked(null);
        }
        for (int i = 0; i < res.length; i++) {
            res[i].r1 = res[((int) (Math.random() * len))];
            res[i].r2 = res[((int) (Math.random() * len))];
            res[i].r3 = res[((int) (Math.random() * len))];
            res[i].r4 = res[((int) (Math.random() * len))];
            res[i].r5 = res[((int) (Math.random() * len))];
        }
        return res;


Oops, now that's surprising. It's strange, that DefaultGC (ParOldGC) actually gets faster for objects containing *more* references to (random) other objects !! This is not a test error and reproducable.
You probably now, that todays CPU's have a multi stage cache and that access to main memory is extremely expensive compared to cache hits, so locality (memory wise) is pretty important. Its obvious this also hit's Full GC when iterating the live set of objects.
(see http://www.akkadia.org/drepper/cpumemory.pdf for more information you'd probably want to read about caches ;-) ).

In contradiction to the CMS collector, ParOldGC ('DefaultGC') is able to compact the heap during Full GC. It seems it does this in a way optimizing the randomly linked object graph by moving objects close to objects referencing them (that's my speculative explanation, others are welcome ..).

This observation leads to the following test:
A comparion of FullGC of 6 million GCWrapper object graphs. One setup as linked list (linked in allocation order) and one with randomly choosen referenced objects.

        GCWrapper res = new GCWrapper(null);
        for ( int i = 0; i < len; i++ ) {
            res = new GCWrapper(res);
        }
        return res;
vs
        GCWrapper res[] = new GCWrapper[len];
        for ( int i = 0; i < len; i++ ) {
            res[i] = new GCWrapper(null);
        }
        for ( int i = 0; i < len; i++ ) {
            res[i].referenced = res[((int) (Math.random() * len))];
        }
        return res;
Boom ! The impact of locality is massive. This will also have a strong impact on performance of a java application accessing those data structures.

Conclusion:

Full GC duration depends on the number of objects allocated and the locality of their references. It does not depend that much on actual heap size.


Questionable design patterns regarding (OldGen) GC and overall locality


I think one can safely assume, that objects allocated at the same time have a high chance to be "near" each other (Note: CMS heap fragmentation may behave different if the application fragments the heap). So whenever statically allocated data is updated by refering to a newly allocated Object, chance is, that the new Object is far away from the referencee, which will cause cache misses. Especially the massive use of Immutables as favoured by many designs and (newer VM-based languages) actually is deadly regarding locality and OldGen GC.
  • Lazy initialization/instantiation
  • Value classes. E.g. GC-wise its better to use a primitive long instead of Date.
  • Immutables (except preallocated Flyweight's)
  • Fine grained class design 
  • Preferring Composition over Subclassing (many wrapper, delegate Objects)
  • JDK's HashMap uses entry objects. Use open addressed Hashmaps instead
  • Use of Long,Integer,Short instead of primitives in static data (unfortunately Java's Generics encourage this). Even if the VM does some optimization here (e.g. internal caching of Integer's), one can verify easily that these objects still have severe impact on GC duration