Tuesday, October 28, 2014

The Internet is running in debug mode

With the rise of the Web, textual encodings like xml/JSON have become very popular. Of course textual message encoding has many advantages from a developer's perspective. What most developers are not aware of, is how expensive encoding and decoding of textual messages really is compared to a well defined binary protocol.

Its common to define a system's behaviour by its protocol. Actually, a protocol messes up two distinct aspects of communication, namely:

  • Encoding of messages
  • Semantics and behavior (request/response, signals, state transition of communication parties ..)

Frequently (not always), these two very distinct aspects are mixed up without need. So we are forced to run the whole internet in "debug mode", as 99% of webservice and webapp communication is done using textual protocols.

The overhead in CPU consumption compared to a well defined binary encoding is factor ~3-5 (JSON) up to >10-20 (XML). The unnecessary waste of bandwith also adds to that greatly (yes you can zip, but this in turn will waste even more CPU).

I haven't calculated the numbers, but this is environmental pollution at big scale. Unnecessary CPU consumption to this extent wastes a lot of energy (global warming anyone ?).

Solution is easy:
  • Standardize on some simple encodings (pure binary, self describing binary ("binary json"), textual)
  • Define the behavioral part of a protocol (exclude encoding)
  • use textual encoding during development, use binary in production.
Man we could save a lot of webserver hardware if only http headers could be binary encoded ..

Friday, October 17, 2014

Follow up: Executors and Cache Locality Experiment

Thanks to Jean Philippe Bempel who challenged my results (for a reason), I discovered an issue in last post: Code-completion let me accidentally choose Executors.newSingleThreadScheduledExecutor() instead of Executors.newSingleThreadExecutor(), so the pinned-to-thread-actor results are actually even better than reported previously. The big picture has not changed that much, but its still worthwhile reporting.

On a second note: There are many other aspects to concurrent scheduling such as queue implementations etc.. Especially if there is no "beef" inside the message processing, these differences become more dominant compared to cache misses, but this is another problem that has been covered extensively by other people in depth (e.g. Nitsan Wakart).

Focus of this experiment is locality/cache misses, keep in mind different queueing implementations of executors for sure add dirt/bias.

As requested, I add results from the linux "perf" tool to prove there are significant differences in cache misses caused by random assignment of Thread - to Actor as done by ThreadPoolExecutor and WorkStealingExecutor.

Check out my recent post for a description of the test case.

Results with adjusted SingleThreadExecutor (XEON 2 socket, each 6 cores, no HT)


As in previous post, "dedicated" actor-pinned-to-thread performs best. For very small local state, there are only few cache misses so differences are small, but widen once a bigger chunk of memory is accessed by each actor. Note that ThreadPool is hampered by its internal scheduling/queuing mechanics, regardless of locality, it performs weak.


When increasing number of Actors to 8000 (so 1000 actors per thread), "Workstealing" and "Dedicated" perform similar. Reason: executing 8000 actors round robin creates cache misses for both executors. Note that in a real world server its likely that there are active and inactive actors, so I'd expect "Dedicated" to perform slightly better than in this synthetic test.

"perf stat -e" and "perf stat -cs" results

(only 2000, 4000, 8000 local size tests where run)

Dedicated/actor-pinned-to-thread:
333,669,424 cache-misses                                                
19.996366007 seconds time elapsed
185,440 context-switches                                            
20.230098005 seconds time elapsed
=> 9,300 context switches per second

workstealing:
2,524,777,488 cache-misses                                                
39.610565607 seconds time elapsed
381,385 context-switches                                            
39.831169694 seconds time elapsed
=> 9,500 context switches per second

fixedthreadpool:
3,213,889,492 cache-misses                                                
92.141264115 seconds time elapsed
25,387,972 context-switches                                            
87.547306379 seconds time elapsed
=>290,000 context switches per second








A quick test with a more realistic test method

In order to get a more realistic impression I replaced the synthetic int-iteration by some dirty "real world" dummy stuff (do some allocation and HashMap put/get). Instead of increasing the size of the "localstate" int array, I increase the HashMap size  (should also have negative impact on locality).


Note that this is rather short processing, so queue implementations and executor internal implementation might dominate locality here. This test is run on Opteron 8c16t * 2Sockets, a processor with 8kb L1 cache size only. (BTW: impl is extra dirty, so no performance optimization comments pls, thx)


As ThreadPoolExecutor is abnormous bad in this Test/Processor combination, plain numbers:

64 HMap entries256 HMapentries2000 HMapentries4000 HMapentries32k HMapentries320k HMapentries
WorkStealing107010711097112912381284
Dedicated656646661649721798
ThreadPool83148751941294921026910602

Conclusions basically stay same as in original post. Remember cache misses are only one factor of overall runtime performance, so there are workloads where results might look different. Quality/specialization of queue implementation will have huge impact in case processing consists of only some lines of code.

Finally, my result: 
Pinning actors to threads created lowest cache miss rates in any case tested.




Tuesday, October 14, 2014

Experiment: Cache effects when scheduling Actors with F/J, Threadpool, Dedicated Threads

Update: I accidentally used newSingleThreadScheduledExecutor instead of newFixedThreadPool(1) for the "Dedicated" test case [ide code completion ..]. With this corrected, "Dedicated" outperforms even more. See follow up post for updated results + "perf" tool cache miss measurement results (do not really change the big picture).

The experiment in my last post had a serious flaw: In an actor system, operations on a single actor are executed one after the other. However by naively adding message-processing jobs to executors, private actor state was accessed concurrently, leading to "false-sharing" and cache coherency related costs especially for small local state sizes.

Therefore I modified the test. For each Actor scheduled, the next message-processing is scheduled once the previous one finished, so the experiment resembles the behaviour of typical actors (or lightweight processes/tasks/fibers) correctly without concurrent access to a memory region.

Experiment roundup:

Several million messages are scheduled to several "Actor" simulating classes. Message processing is simulated by reading and writing the private, actor-local state in random order. There are more Actors (24-8000) than threads (6-8). Note that results established (if any) will also hold true for other light-weight concurrency schemes like go-routines, fibers, tasks ...

The test is done with

  • ThreadPoolExecutor
  • WorkStealingExecutor
  • Dedicated Thread (Each Actor has a fixed assignment to a worker thread)

Simulating an Actor accessing local state:


Full Source of Benchmark

 Suspection:
As ThreadPoolExecutor and WorkStealingExecutor schedule each message on a random Thread, they will produce more cache misses compared to pinning each actor onto a fixed thread. Speculation is, that work stealing cannot make up for the costs provoked by cache misses.


(Some) Variables:
  • Number of worker threads
  • Number of actors
  • Amount of work per message
  • Locality / Size of private unshared actor state


8 Threads 24 actors 100 memory accesses (per msg)


Interpretation:

For this particular load, fixed assigned threads outperform executors. Note: the larger the local state of an actor, the higher the probability of a prefetch fail => cache miss. In this scenario my suspection holds true: Work stealing cannot make up for the amount of cache misses. fixed assigned threads profit, because its likely, some state of a previously processed message resides still in cache once a new message is processed on an actor.
Its remarkable how bad ThreadpoolExecutor performs in this experiment.

This is a scenario typical for backend-type service: There are few actors with high load. When running a front end server with many clients, there are probably more actors, as typically there is one actor per client session. Therefor lets push up the number of actors to 8000:

8 Threads 8000 actors 100 memory accesses (per msg)



Interpretation:

With this amount of actors, all execution schemes suffer from cache misses, as the accumulated size of 8000 actors is too big to fit into L1 cache. Therefore the cache advantage of fixed-assigned threads ('Dedicated') does not make up for the lack of work stealing. Work Stealing Executor outperforms any other execution scheme if a large amount of state is involved.
This is a somewhat unrealistic scenario as in a real server application, client request probably do not arrive "round robin", but some clients are more active than others. So in practice I'd expect "Dedicated" will at least have some advantage of higher cache hits. Anyway: when serving many clients (stateful), WorkStealing could be expected to outperform.

Just to get a third variant: same test with 240 actors:


These results complete the picture: with fewer actors, cache effect supercede work stealing. The higher the number of actors, the higher the number of cache misses gets, so work stealing starts outperforming dedicated threads.


Modifying other variables

Number of memory accesses

If a message-processing does few memory accesses, work stealing improves compared to the other 2. Reason: fewer memory access means fewer cache misses means work stealing gets more significant in the overall result.

 ************** Worker Threads:8 actors:24 #mem accesses: 20
local state bytes: 64 WorkStealing avg:505
local state bytes: 64 ThreadPool avg:2001
local state bytes: 64 Dedicated avg:557
local state bytes: 256 WorkStealing avg:471
local state bytes: 256 ThreadPool avg:1996
local state bytes: 256 Dedicated avg:561
local state bytes: 2000 WorkStealing avg:589
local state bytes: 2000 ThreadPool avg:2109
local state bytes: 2000 Dedicated avg:600
local state bytes: 4000 WorkStealing avg:625
local state bytes: 4000 ThreadPool avg:2096
local state bytes: 4000 Dedicated avg:600
local state bytes: 32000 WorkStealing avg:687
local state bytes: 32000 ThreadPool avg:2328
local state bytes: 32000 Dedicated avg:640
local state bytes: 320000 WorkStealing avg:667
local state bytes: 320000 ThreadPool avg:3070
local state bytes: 320000 Dedicated avg:738
local state bytes: 3200000 WorkStealing avg:1341
local state bytes: 3200000 ThreadPool avg:3997
local state bytes: 3200000 Dedicated avg:1428


Fewer worker threads

Fewer worker threads (e.g. 6) increase probability of an actor message being scheduled to the "right" thread "by accident", so cache miss penalty is lower which lets work stealing perform better than "Dedicated" (the fewer threads used, the lower the cache advantage of fixed assigned "Dedicated" threads). Vice versa: if the number of cores involved increases, fixed thread assignment gets ahead.

Worker Threads:6 actors:18 #mem accesses: 100
local state bytes: 64 WorkStealing avg:2073
local state bytes: 64 
ThreadPool avg:2498
local state bytes: 64 Dedicated avg:2045
local state bytes: 256 WorkStealing avg:1735
local state bytes: 256 
ThreadPool avg:2272
local state bytes: 256 Dedicated avg:1815
local state bytes: 2000 WorkStealing avg:2052
local state bytes: 2000 
ThreadPool avg:2412
local state bytes: 2000 Dedicated avg:2048
local state bytes: 4000 WorkStealing avg:2183
local state bytes: 4000 
ThreadPool avg:2373
local state bytes: 4000 Dedicated avg:2130
local state bytes: 32000 WorkStealing avg:3501
local state bytes: 32000 
ThreadPool avg:3204
local state bytes: 32000 Dedicated avg:2822
local state bytes: 320000 WorkStealing avg:3089
local state bytes: 320000 
ThreadPool avg:2999
local state bytes: 320000 Dedicated avg:2543
local state bytes: 3200000 WorkStealing avg:6579
local state bytes: 3200000 
ThreadPool avg:6047
local state bytes: 3200000 Dedicated avg:6907

Machine tested:

(real cores no HT)
$ lscpu 
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                12
On-line CPU(s) list:   0-11
Thread(s) per core:    1
Core(s) per socket:    6
Socket(s):             2
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 44
Stepping:              2
CPU MHz:               3067.058
BogoMIPS:              6133.20
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              12288K
NUMA node0 CPU(s):     1,3,5,7,9,11
NUMA node1 CPU(s):     0,2,4,6,8,10


Conclusion
  • Performance of executors depends heavy on use case. There are work loads where cache locality dominates, giving an advantage of up to 30% over Work-Stealing Executor
  • Performance of executors varies amongst different CPU types and models (L1 cache size + cost of a cache miss matter here)
  • WorkStealing could be viewed as the better overall solution. Especially if a lot of L1 cache misses are to be expected anyway.
  • The ideal executor would be WorkStealing with a soft actor-to-thread affinitiy. This would combine the strength of both execution schemes and would yield significant performance improvements for many workloads
  • Vanilla thread pools without work stealing and actor-to-thread affinity perform significantly worse and should not be used to execute lightweight processes.
Source of Benchmark

Sunday, October 12, 2014

Alternatives to Executors when scheduling Tasks/Actors



Executors work well when fed with short units of stateless work, e.g.: division of a computation onto many CPU's. However they are sub-optimal to schedule jobs which are part of an ongoing, larger unit of work, e.g.: Scheduling messages of an actor or lightweight process.

Many Actor Frameworks (or similar message passing concurrency frameworks) schedule batches of messages using an Executor service. As the Executor service is not context aware, this spreads out processing of a single Actor/Task's messages across several threads/CPU's.

This can lead to many cache misses when accessing the state of an Actor/Process/Task as the processing thread changes frequently.
Even worse, this way CPU caches cannot stabilize as each new "Runnable" washes out cached memory of the previously processed task's.

A second issue arises when using busy-spin polling. If a framework reads its queues using busy-spin, it generates 100% CPU load for each processing Thread. So one would like to add a second thread/core only if absolutely required, so a one-thread-per-actor policy is not feasible.

With Kontraktor 2.0 I implemented a different scheduling mechanism, which achieves a horizontal scaling using a very simple metric to measure actual application CPU requirements, without randomly spreading processing of an actor onto different Cores's.


An actor has a fixed assignment to a workerthread ("DispatcherThread"). The scheduler periodically reschedules Actors by moving them to another worker if necessary.

Since overly sophisticated algorithms tend to introduce high runtime costs, actual scheduling is done in a very simple manner:

1) a Thread is observed as being overloaded, if the consume loop (pulling messages from the actor queue) has not been idle for N messages (currently N=1000).
2) If a Thread has been marked "overloaded", Actors with largest mailbox sizes are migrated to a newly started Thread (until #Threads == ThreadMax) as long SUM_QUEUED_MSG(Actors scheduled on Thread A) > SUM_QUEUED_MSG(Actors scheduled on newly created Thread B).
3) in case #Threads == ThreadMax, actors are rebalanced based on summation of queued messages and "overload" observations.

Problem areas: 
  • If the processing time of messages vary a lot, summation of queued messages is misleading. An improvement would be to add a weight onto each message type by profiling periodically. A simpler option would be to give an Actor an additional weight to be multiplicated with its queue size.
  • For load bursts, there is a latency until all of the available CPU's are actually used.
  • The delay until JIT kicks in produces bad profiling data and leads to false scale ups (heals over time, so not that bad).

An experiment

Update: Woke up this morning and it came to my mind this experiment has a flaw, as jobs per workitem are scheduled in parallel for the executor tests, so I am probably measuring the effects of false sharing. Results below removed, check the follow up post.



Performance cost of adaptive Scaling


To measure cost of auto-scheduling vs. explicit Actor=>Thread assignment, I run the Computing-Pi Test (see previous posts).
These numbers do not show effects of locality as it compares explicit scheduling with automatic scheduling.

Test 1 manually assigns a Thread to each Pi computing Actor,
Test 2 always starts with one worker and needs to scale up automatically once it detects actual load.


(note example requires >= kontraktor2.0-beta-2, if the 'parkNanos' stuff is uncommented it scales up to 2-3 threads only)

The test is run 8 times with increasing thread_max by one with each run.

results:

Kontraktor Autoscale (always start with 1 thread, then scale up to N threads)

1 threads : 1527
2 threads : 1273
3 threads : 718
4 threads : 630
5 threads : 521
6 threads : 576
7 threads : 619
8 threads : 668

Kontraktor with dedicated assignment of threads to Actors (see commented line in source above)

1 threads : 1520
2 threads : 804
3 threads : 571
4 threads : 459
5 threads : 457
6 threads : 534
7 threads : 615
8 threads : 659


Conclusion #2

Differences in runtimes can be attributed mostly to the delay in scaling up. For deterministic load problems, prearranged Actor scheduling is more efficient ofc. However thinking of a server receiving varying request loads over time, automatic scaling is a valid option.

Saturday, September 27, 2014

Breaking the habit: Solving the Dining Philosopher's problem with Actors

Concurrent programming using Actors is different to traditional mutex based approaches, therefore I'd like to show how the solution of a classical concurrency problem can be done using Actors. It requires some time to get used to it as future/actor based concurrency looks different and uses different patterns.

Additionally I'll show how little effort is required to make this a distributed application, as asyncronous Actor based concurrency is much more resilient regarding network induced latency.

I'll use my Kontraktor Actor library version 2.0 beta to also illustrate how the "message == asynchronous method call" approach of Kontraktor works out in "practice".

Resources:

Kontraktor lib: https://github.com/RuedigerMoeller/kontraktor.
Full source of sample is here.

Description of the philosophers dining problem:
http://en.wikipedia.org/wiki/Dining_philosophers_problem
(I am using a non-fair solution to keeps things simple.)


The Table

One actor represents the table. For each fork, a list of philosophers waiting for "fork is free"-notification is kept.

Short explanation: In Kontraktor all public methods are translated into asynchronous messages put on to an actors queue ("mailbox") behind the scenes. The worker thread of an actor takes the messages of the mailbox and calls the "real" implementation. So its guaranteed an actor is executing single threaded. To identify asynchronous calls easily, I always put a $ in front of public actor methods.


Explanation: if a fork is taken and the queue of waiting philosophers is empty, a fulfilled promise is returned, else an unfulfilled promise is added to the queue. The caller code then looks like:


Once a fork is returned, the next Promise in the arraylist (if any) is notified. signal() is equivalent to receive("dummy",null).

Note that the "result" object is unused, I just need a signal from the future returned. That's why the fulfilled Promise gets a "void" as a dummy result object (which will then trigger immediate execution of the then-closure on caller side).

Note how I can use simple single threaded collections and data structures as the actor is guaranteed to be executed single threaded.

The Philosopher

A philosopher has a wonderful life which can be divided into 3 states:

10 think randomTime
20 be_hungry_and_try_to_grab_2_forks
30 eat randomTime
40 goto 10

[rumors are of lost lines 15 and 35 related to indecorous social activities introduced after Epicurus happened to join the table]


The meat is in the $live method:
After thinking, the philosopher tries to grab one fork, the closure given to the "then" method gets executed once the fork becomes available. Once both forks have been grabbed, another "delayed" call keeps the "eat" state for a while, then both forks are returned and a $live message is put onto the actors mailbox (=message queue) to start thinking again.

Remarks:

  • "delayed" just schedules the given closure onto the actors mailbox with a delay.
  • "self().$live()" puts a $live message onto the actors mailbox. If "this.$live()" would be called, the $live() message would be executed directly [synchronous] like a normal method. In this example this would not hurt, however in many cases this makes a difference.
  • Wait-Free. The executing thread is never blocked. One could easily simulate thousands of philosophers onto a single thread

Setup and run

The main method to start the life of 5 philosophers looks like this:

and

Note Hoarde is a utility to create and manage groups of Actors. "startReportingThread" just starts a thread dumping the state of all philosophers:


"Is this still Java?" .. probably weird looking stuff for the untrained eye ..
(note Philosopher.$getState is async and yield() is used to wait for all futures to complete before executing the then() closure printing the state)

BTW: spacing is somewhat inconsistent as I started to reformat code partially during edit .. but won't do screenshots again ..

Yay, done:



Let's make it a distributed application

The killer feature of Actor based asynchronous+lock/wait free programming is simple distributability. A solution using blocking mutexes would have to get a rewrite or rely on synchronous remote calls, so the bigger the network latency, the lower the throughput. Actors do not suffer from that.
We can just change the startup to make the table run remotely:


Is this cool or what ? I now can feed hoardes of philosophers with a single table server:





Saturday, August 30, 2014

Backpressure in async communication systems

I recently saw this excellent presentation video of Mr Kuhn ..  and later on found about the community standardization effort of reactive streams.

As I have done many experiments dealing with backpressure issues in a high volume async distributed system, I'd like to point out some possible pitfalls of the "flow control" mechanism proposed in the reactive streams spec and point out how I dealt with the problem (after having tried similar solutions).

The problem of flow control arises in asynchronous communication if the sender sends messages at a higher pace than (one of the) receivers can process them.
This problem is well known from networking and actually TCP implements a similar scheme to the one proposed in the RStreams spec. Another - less known - flow control scheme is NAK. NAK is used typically in low latency brokerless (multicast) messaging products.

The "Reactive Streams" spec proposes an acknowledge based protocol (although its not presented as such).

Receiver sends "requestMore(numberOfMsg)" to Sender
Sender sends "numberOfMsg" messages to Receiver

by emitting "requestMore", a Receiver signals it has received+processed (or will have finished processing soon) previously received messages. Its an acknowledgement message (ACK).

Problematic areas:

  1. it works for point to point communication only. If you have a multicast situation (e.g. one sender multicasts to N receivers), this does not work well.
    Reason:
    In order to make this work, the sender needs to be aware of all receivers, then wait for every receiver to have sent a "requestMore", then compute the minimum of all "requestMore" messages and finally send this amount of messages. Once a receiver dies, no "requestMore" is send/received, so the sender can't sent anything for an extended amount of time (receiver-timeout). If one receiver pauses (e.g. GC's), a "requestMore" is missing, so the sender needs to stall all communication.
    You can see those effects in action with JGroups using a configuration with credit based flow control protocol. Once a cluster node terminates, throughput falls to zero until the node-leave timeout is reached (similar issues are observable with the ACK-based NACKACK reliable UDP transport of JGroups). ACK ftl :-)
  2. The actual throughput depends hard on connection latency. As the ACK signal ("requestMore") takes a longer time then to reach the sender, the receiver buffer must be enlarged depending on latency, additionally the size of chunks requested by "requestMore" must be enlarged. If one does not do this, throughput drops depending on latency. Short said:
    sizeOfBuffer needs to be >= eventsProcessable per roundTripLatency interval. This can be solved by runtime introspection, however in practice this can be tricky especially with bursty traffic.
  3. As long reliable communication channels (in memory, TCP) are used: Those transports already have implemented a flow control / backpressure mechanism, so "Reactive Streams" effectively doubles functionality present at a lower and more efficient level. Its not too hard to make use of the backpressure signals of e.g. TCP (one solution is an "outbound" queue on sender side, then observe its size). One can achieve the same flow control results without any need for explicit application level ACK/Pull messages like "requestMore". For inmemory message passing, one simply can inspect the queue size of the receiver. [you need a queue implementation with a fast size() implementation then ofc]

Alternative: NAK instead of ACK, adaptive rate limits

The idea of NAK (=negative acknowledgement) is raise backpressure signals from receiver side only in case a component detects overload. If receivers are not in an overload situation, no backchannel message traffic is present at all.

The algorithm outlined below worked for me in a high throughput distributed cluster:

Each sender has a send rate limit which is increased stepwise over time. Usually the starting default is a high send rate (so its actually not limiting). Receivers observe the size of their inbound queue. Once the queue size grows to certain limits, receivers emit an NAK( QFillPercentage ) message. E.g. 50%, 75%, 87%, 100%. The sender then increases the send rate limit depending on the "strength" of the NAK message. To avoid permanent degrade, the sender increases the send rate stepwise (the step size may depend on the time no NAK was received).

This works for 1:1 async communication patterns as well as for 1:N. In an 1:N situation the sender reacts to the highest strength NAK message within a timing window (e.g. 50ms). The 0-traffic-on-dying-receiver problem is not present. Additionally the sender does not need to track the number of receivers/subscribers, which is a very important property in 1:N multicast messaging.

Finding the right numbers for NAK-triggers, and sender speed up steps can be tricky as they depend on the volatility of sendrate and latency of transport link. However it should be possible to adaptively find appropriate values at runtime. In practice I used empirical values found by experimenting with the given system on sample loads, which is not a viable solution for a general implementation ofc.

The backpressure signal raised by the NAK messages can be applied to the sender either blocking or nonblocking. In concrete system I just blocked the sending thread which effectively puts a dynamically computed (current rate limit) "cost" for each async message send operation. Though this is blocking , it did not hurt performance, as there was no "other work" to do in case a receiver can't keep up. However backpressure signals can be applied in a nonblocking fashion as well ("if (actualSendRate < sendRateLimit) { send } else { do other stuff }").








Thursday, June 5, 2014

Java Concurrency: Learning from Node.js, Scala, Dart and Go


Preface

Stuff below regarding asynchronism isn't exactly new, however in the Java-world you'll still see a lot of software using synchronous, blocking multi threading patterns, frequently built deep into the architecture of apps and frameworks. Because of the Java tradition to unify concurrency and parallelism, frequently even senior developers are not aware of the fact, one cannot build well scaling high throughput distributed systems using typical java synchronous multi-threading idioms.

So what is the problem with synchronous processing ?

Think of a web server handling user requests. In this example, to process a client request, multiple requests to another remote service have to be done.


Now, the service can process 10.000 requests per second, but request latency is 5ms (network time, de/encoding). So a simple request/response takes 5+5 ms (back and forth) = 10 ms. A single threaded client implemented in a blocking fashion can send/receive max. 100 requests per second, therefore is unable to saturate the service.
If the webserver is expected to emit on average 1 service requests per client request, its thread pool has to be sized to 100 worker threads in order to maximize throughput and saturate the service.

Now consider that the remote service makes use of another service with same throughput and latency characteristics.

This increases latency (as seen from the webserver) by another 10 ms, so overall latency of a service request/response is now 20ms in turn requiring a worker pool of 200 threads in the webserver, 100 service threads in service (A).

It is clear, in a complex distributed system this imposes a lot of problems when using thread based synchronous processing. A change in the infrastructure (=latency changes) might require reconfiguration of many servers/services.

A common solution to this dilemma is to just configure way too much worker threads in turn getting into multithreading related issues without need. Additionally, a thread is quite a heavy weight object. Its impossible to scale up to say 100.000 or more threads in case. The higher the latency, the more threads must be used to saturate a remote service (e.g. think of inter-datacenter connections), so if you need to process e.g. 50k requests per second and need to query a remote service for each incoming requst, you simply cannot use a synchronous, blocking style. Additionally, having more threads than CPU cores hampers overall performance (context switches), so having 100's of threads per CPU actually will reduce throughput significantly.

If you think of higher numbers (e.g. 100k requests per second), its clear that one cannot compensate for synchronous, blocking programming by adding more threads.

With synchronous IO programming, latency has direct impact on throughput


No problem, we can go asynchronous, we have threads ..

(note: sleep is a placeholder for some blocking IO or blocking processing)
Bang ! because:
  • HashMap must be changed to ConcurrentHashMap, else you might run into endless loop when calling "get" concurrenlty to a "put" operation.
  • You might see or not the result of "someInt++" (JMM). Needs to be declared 'volatile'.
Congrats ! Now most of your code is multithreaded, which means you'll clutter your code with ConcurrentHashMap's and Atomics, synchronized and deadlocks ... the step-up of "callback hell" is "multithreaded callback hell".

With plain java, asynchronous programming provokes unnecessary multithreading.

A thread-safe version of this code would manage to deliver the callback "inside" the calling thread. But for that we need to enqueue the callback and need kind of a threading framework which ensures the queue is read and executed.

Well, one can do this in plain java like this:


Hm, some boilerplate code here, basically a handcrafted simple implementation of an Actor. Actually its a very simple case, as the async call does not return values. In order to realize this, more handcrafting would be required (e.g. a callback interface to deliver the result of blocking code to the calling thread, additionally you won't do the result-callback from your one-and-only-unblockable mailbox processor thread. Tricky stuff ahead.

With an actor library, this stuff is solved inside the actor framework. Same code with kontraktor:


(I left some comments as well, to avoid dillusion of the fact that only 2-3 lines of extra code are required compared to a synchronous implementation).

Reminder: Bad Threads

Using multi threaded asynchronous processing easily leads to massive multi threading, which raises a bunch of problems:
  1. Out-of-Control Threading: its hard to tell which parts of your application data/resources are  accessed concurrently as software grows (an innocent call to your application framework from a within a foreign callback thread can literally make everything multithreaded unnoticed).
  2. Because of (1) developers start synchronizing defensively
  3. Because of (2) you get deadlocks
  4. Because of (2) you run 500 Threads all waiting on contended locks, so your service goes down despite having just 60% CPU usage.
  5. Because of (1) you get spurious unreproducable errors caused by random cache coherency/data visibility issues.
Anyway, lets have a look what newer languages bring to the table regarding concurrency ..



Concurrency Idioms/Concepts in Go, Dart, Node.js, Scala

Despite the hype and discussions surrounding those emerging languages, they technically present more or less the same basic concept to deal with concurrency (ofc there are lots of differences in detail, especially Go is different):
Single threaded entities (with associated thread-local state) sequentially process incoming events from bounded or unbounded queue's.
This technical foundation can be presented in different ways to a programmer. With actors, functionality and data is grouped 'around' the queue ('mailbox'). In the channel model, larger entities ('Process', 'Isolate') explicitely open/close 'channels' (=queues=mailboxes) to exchange messages with other processing entities.


node.js/javascript

A node.js process is a single actor instance. Instead of a 'mailbox' the queue in node.js is called "event queue". Blocking operations are externalized to the javascript V8 VM, the results of such an asynchronous operation is then put onto the event queue. The programming model is callback/future style however there are extensions providing a fiber like (software threads, "green" threads) programming model.


Multicore hardware is saturated by running several instances of node.js.


Dart 

(on server side) features a more sophisticated, but similar model compared to JS. In contradiction to javascript, Dart provides a richer set of utilities to deal with concurrency.
Its possible to run several event loops called 'Isolates' from within a single process or in separate processes. Conceptually, each isolate is run by a separate thread. Isolates communicate by putting messages onto each others event loop. Very actor'ish, in fact this can be seen as an implementation of the Actor-pattern.
As a special, a Dart Actor Isolate reads from 2 event queues, of which one has higher priority and (simplified) processes self-induced asynchronous code, while the other one gets events from the 'outside' world.

Api-wise, Dart favours 'Futures' over 'Callbacks':

Ordering in Dart

ordered execution of asynchronous pieces of code then can be chained using 'then'.

For messaging amongs Isolates, Dart provides generic deep-copy of arbitrary object graphs in-process, and simple datatypes for communication amongst isolates running in different processes.

Both Dart and javascript actually use a flavour of the Actor pattern, though they use a different terminology.


Scala/Akka

Although Scala has access to the same concurrency primitives as Java (runs on the JVM), Scala has a tradition of asynchronism, which is reflected by existence of many async tools, idioms and libraries. Its a very creative, adventurous and innovative platform, which can be both good or bad in industrial-grade software projects.
Ordering in Akka
As to be expected, Scala's Akka+async libraries are very rich featured. Akka proposes a somewhat different system-design phliosophy compared to Dart/Nodes.js as they encourage a more fine grained actor design.
Akka promotes an untyped actor model (though typed actors are available) and has features such as actor-induced message reordering/dropping, re-dispatching, scanning the mailbox, etc. As always: great flexibility also paves the way for abusive and hard to maintain programming patterns. Splitting up an application into many actors (fine grained actor design) is advantageous in that a sophisticated scheduler may parallelize execution better. On the other hand this comes at a high cost regarding maintainability and controllability. Additionally, enqueuing a message instead of doing a plain method call has high fixed overhead, so its questionable wether doing fine grained actor design pays off.

As Akka originally was designed for Scala, it requires quite some boiler-plating when used from java. One either has to write immutable 'message classes' or (with typed actors) has to define an interface along with each actor (because typed actors use java.*.Proxy internally).

As I showed in a previous post, its not optimized to the bones, however this should not be significant for real world asynchronous IO/messaging centric applications.

Recent versions improved remoting of actors and also added support for multicast-based messaging (zeroMQ).

Go

If you look at typical processing sequences in an actor based asynchronous program, you'll probably notice its kind of a handcrafted multiplexing of concurrently executed statement sequences onto a single thread. I'll call such a sequence 'threadlet'.


Basically myFunc models an ordered sequence of actions ('threadlet'):

  • getUrl
  • process contents
  • return result

Since getting the URL needs some time, it is executed async, so other events can be read from the inbox.

If the runtime would be aware of "blocking calls" automatically, it could assist by providing the illusion of  single threaded execution by automatically splitting a "threadlet" into separate events. This is what Go does:
go myFunc() {
  String text = getUrl("http://google.com" );
  [process text]
  return result;
}()
While one builds up a "virtual stack" of a "threadlet" in an actor based system by chaining futures and callbacks, Go manages to transform a "normal" looking piece of code behind the scenes. For that it builds up a (dynamically sized) stack which replaces Future chaining of the actor based approaches. Its a non-native lightweight thread.

So this might look very different, its actually a similar concept of multiplexing small snippets of ordered statement sequences onto a single thread. Its similar to "Green Threads" (low overhead software emulated threads). If such a 'goroutine'/'threadlet' needs to wait or is logically blocked, the processing thread does not stop but processes another goroutine (in case of actors: message from mailbox).

As the Java, Javascript and Dart Virtual Machines have no concept of "coroutines" (=a runnable piece of code with associated stack and state = software thread = green thread) support, a more explicit syntax to emulate green threads is required for asynchronous programming in those lanugages.
The rationale behind go-routines/actors/future based asynchronism is, that the programmer should specify concurrency of execution. The paralellism/scheduling is  handled by the runtime system.
The Go runtime schedules go routines onto one or more OS threads. Blocking operations are automatically detected (e.g. OS calls), blocking operations such as waiting on messages from a channel/queue actually do not block as the current thread just continues with executing another go-routine.
So from an abstract point of view, go-routines are similar to lightweight threads with built-in awareness of non-blocking message-queues (channels).

Channels replace mailboxes,futures,callbacks

An actor encapsulates a queue (mailbox), a set of messages 'understood' and private (=thread local) data. In Go, mailboxes are replaced by queue objects called 'Channels' which can be created and deleted dynamically. A go routine might choose to open one or more queues (channels) it is listening and sending to. The size of a queue can be set to 0, which equals synchronous data transfer (actually still non blocking as a go-routine != a thread).

Shared State

As go-routines might run in parallel on different cores if the runtime scheduler decides so, there might be multithreaded access to shared data structures (if two go-routines are scheduled on different threads).
As far I understood, in those cases manual synchronization is required. I'd see this as a disadvantage compared to the actor model, where it is very clear wether private or shared data gets accessed. Additionally the concept of actor-local data matches current CPU designs, as each core has a non-shared cache.
The actor pattern enforces a thread-data-ownership by design. Astonishingly I did not find something along the lines of a GO memory model comparable to the JMM, which is required when access to shared data from multiple threads is possible.

See also Concurrency is not parallelism



Dart, Node.js have a shared nothing architecture

No data can be shared amongs processes/isolates. In order to share data, messaging must be used (=has to be encoded and copied). That's probably a contributing factor to the rise of no-sql storage like redis or mongodb. They are used both for persistance and to mimic a shared 'Heap'. In order to share complex data structures, kind of serialization has to be used. Frequently simple solutions, such as transmitting primitve types only or JSon are used, which adds quite some overhead to data sharing.

Other advantages of Shared Nothing
  • single threaded garbage collection
  • single threaded VM + JIT
  • No memory model like JMM required


Performance constraints

While single threaded concurrency has many advantages in IO centric, middle ware layer applications, it is a disadvantage when it comes to situations where several cores are used to speed up processing. E.g. it is not possible to implement something like the Disruptor without shared memory and access to native threading/volatile/barriers.

Why single threaded concurrency is easier than multithreaded concurrency (true parallelism)

Actor/CSP based concurrency still is challenging at times, however one does not have to deal with the issues arising from memory visibility rules resulting from modern CPU architectures (see JMM). So a synchronous sequence of statments inside an actor can do a simple HashMap put or increment a counter without even thinking of synchronization or concurrent modification.


Scheduling

As stated, actor (and asynchronous programming) splits up execution into smaller pieces of ordered statement sequences. Go does a similar thing with its go-routines.
A challenge to runtime-system implementations is the scheduling of ordered statement sequences ("threadlets") onto "real" OS threads.

The V8 VM (Node.js) schedules a single event loop (=mailbox) + some helper threads to "outsource" blocking operations. So the user application is run on a single thread.

The Dart VM provides a more refined system, having a "micro event loop" and an external event loop. Click here for more details .

Akka has a pluggable scheduling strategy: thread pool based, fork join (applies work-stealing), and pinned (each actor has its own thread). The queue implementation of the mailbox can be choosen, however there is no concept of several queues. Given that the "dual queues" are just an implementation of a "message priority" concept, it is clear similar concepts can be realized with Akka.

In my pet-library 'kontraktor', I use a dual queue approach similar to Dart (by chance). The programmer defines the mapping of actors to native (Dispatcher-) threads. This gives best results regarding cache-locality if done right. A downside is that there is no dynamic adaption in case an applications has varying load patterns.

Pure work-stealing algorithm might perform good in micro benchmarks, in real applications the penalty of cache misses when scheduling an actor on a different thread could be much higher compared to micro benchmark results.

In order to implement a self-optimizing dispatcher, a sophisticated runtime analysis is required (e.g. count cache misses of 'threadlets', track communication patterns amongst different actors) as outlined here. AFAIK no VM actually has implemented such sophisticated algorithms,  there is ongoing research in the Go community.

Further reading on GO's scheduler.


Do we all have to learn Scala, Go or Dart now ?

As Java has all the low level concurrency tools at hand, mostly a change in habit and mind-culture is required. A lot of Java APIs and components feature synchronous, blocking interfaces. Avoid them, they will become problematic in the uprising world of globally distributed systems.

In fact, JVM based languages are at advantage, because they have the freedom to choose or even mix different concurrency models.
One can implement Actors, Futures (nonblocking ones, not the blocking version like java.concurrent.Future) using the JVM's concurrency primitives. With the introduction of the shorthand syntax for anonymous classes (some call them lambdas) asynchronous code does not look as messy as with prior versions of java.

If I'd be a student, I'd go with Scala. This will also improve your general programming skills a lot and bring in a lot of interesting ideas and design patterns.
If you are in the middle of a java career and already heavily invested, its probably better to port/copy successful design patterns found in other, newer languages.

A groundbreaking improvement to Java's concurrency toolset would require VM and language improvements:

  • There has to be a way for the programmer to express concurrent/asynchronous execution, that differs from expressing parallel execution, so the JVM gets a chance to optimize execution of concurrent tasks. Currently, concurrency and parallelism are expressed using threads. 
  • Native support for continuations would be required (probably a very hard requirement, especially regarding HotSpot). There are some implementations using pure bytecode weaving out there (e.g. WebFlow) which unfortunately have pretty high performance costs.

Anyway, with recent language improvements (java 8) callback/futures style is much more manageable than before.
Another solution might come from hardware/OS. If threads could be realized in a much more lightweight way (+ faster context switch), software-level concurrency by multiplexing might not be required anymore.


Real world experience

We have transformed two processes of a very IO intensive clustered application from traditional multithreading to an actor based approach. Both of them performed an order of a magnitude faster thereafter. Additionally the code base got much more maintainable as the amount of shared data (data being accessed multithreaded) reduced dramatically. The number of threads declined from >300 to 2 (+some pooled helper threads).

What we changed:
  • Did not use threads to model concurrency. Concurrency != parallelism
  • We used non blocking IO
  • We used an actor approach (kontraktor). This way an ownership-relation is created inbetween an execution entity (actor) and datastructures.
  • We respected the short version of the "reactive manifest":


                                                     Never block





Tuesday, January 28, 2014

Comparision of different concurrency models: Actors, CSP, Disruptor and Threads

In order to make use of modern multi core/socket hardware, Java offers threads and locks for concurrent programming. It is well known this model suffers of a number of problems:
  • Deadlocks
  • Bad scaling due to "over-synchronization" or contention on common resources
  • Errors caused by invalid synchronization are timing dependent and hard to reproduce. It is common they appear under special conditions (load, hardware) in production.
  • As a software project grows, it gets hard to safely modify the application. A programmer needs global awareness of an applications control flow and data access patterns in order to change the software without introducing errors.
  • The subtleties of the Java Memory Model are not well known to most Java programmers. Even if JMM is known, incorrect programming isn't obvious and will fail rarely. It can be a nightmare to spot them from production logs of large applications.
So I am looking for alternative models to express concurrency. Therefore a simple benchmark of the most popular "alternative" concurrency models is done: Actors, CSP and Disruptor.



Actors


Real Life Analogy: People cooperating by sending mails to each other.

An Actor is like an object instance executed by a single thread. Instead of direct calls to methods, messages are put into the Actors "mailbox" (~queue). The actor single threaded reads and processes messages from the queue sequentially (with exceptions).
Internal state is exposed/shared by passing messages (with copied state) to other Actors.







CSP (Communicating Sequential Processes)


Real Life Analogy: People phoning each other.

Though a different terminology is used, CSP systems can be seen as a special actor system having bounded mailboxes (queues) of size 0.

So if one process (~Actor) wants to pass a message to another process, the caller is blocked until the receiver accepts the message. Alternatively to being blocked, a CSP-process can choose to do other things e.g. check for incoming messages (this introduces  non determinism to the order of outgoing messages). A receiver cannot accept incoming messages if he is occupied with processing a prior one.





Disruptor


The disruptor data structure came up some years ago and was invented and pioneered by Martin Thompson, Mike Barker, and Dave Farley at LMAX exchange.

Real Life Analogy: Assembly line




The disruptor is a bounded queue (implemented by a ring buffer) where producers add to the head of the queue (if slots are available, else the producer is blocked).
Consumers access the queue at different points, so each consumer has its own read position (cursor) inside the queue. Sequencing is used to manage queue access and processing order.



Thread + locking of shared data


contention
Ok, everybody knows this. Its Java's default to model concurrent execution. Actually these are the primitives (+CAS) used to build higher level concurrent models like those described above.











Conclusion


While the CSP/Actor-Model realizes inter-thread communication by passing/copying thread-local data to some queue or process, the Disruptor keeps data in place and assures only one thread (consumer or producer) is owner of a data item (=queue slot) at a time.
This model seems to fit existing CPU, memory and VM architectures better resulting in high throughput and superior performance. It avoids high allocation rates and reduces the probability of cache misses. Additionally it implicitly balances processing speed of interdependent consumers without growing queues somewhere.
There is no silver bullet for concurrent programming, one still has to plan carefully and needs to know what's going on.

While I am writing this, I realize each of these patterns has its set of best-fit problem domains, so my initial intention using a single benchmark to compare performance of different concurrency models might be somewhat off :-).


 

The Benchmark Application


 

Update: Jason Koch implemented a custom executor performing significantly better than the stock JDK executors. See his blog post.

The benchmark approximates the value of PI concurrently. Therefore N jobs are created, and the result of each job must be added to finally get an approximation of PI. For maximum performance one would create one large job for each Core of the CPU used. With this setup all solutions roughly would perform the same as there is no significant concurrent access and scheduling overhead.

However if we compute PI by creating 1 million jobs, each computing a 0..100-loop slice of PI, we get an impression of:
  • cost of accessing shared data when the result of each job is added to one single result value
  • overhead created by the concurrency control mechanics (e.g. sending messages/scheduling Runnables to a thread pool, CAS, Locks, volatile r/w ..).
The test is run with 2 configurations
  • 100 iterations per pi-slice-job, 1,000,000 jobs
  • 1,000 iterations per pi-slice-job, 100,000 jobs
As hardware I use
  • AMD Opteron 6274 Dual Socket 2,2 Ghz. Each socket has 8 cores + 8 Hardware Threads (so overall 16 cores + 16 HW Threads)
  • Intel XEON@3Ghz dual socket six core (12 cores + Hyperthreading turned off). (2011 release)
I never use more threads as there are "real" cores.



Stop the blabber, gimme results ! 


See bottom of this post for the benchmark source.
  • Threads - somewhat naive thread based implementation of the Pi computation. Enough effort invested it is possible to match any of the other results of course. At the core of the VM, threads, locks (+atomic, volatile r/w + CAS) are the only concurrent primitives. However there is no point in creating an ad-hoc copy of the Disruptor or an Actor system in order to compare concurrency approaches.
  • Akka - a popular Actor implementation on the VM. The benchmark has been reviewed and revised (especially the ActorSystem configuration can make a big difference) by the Akka crew. Threads are scheduled using Java 7's fork join pool. Actually the Pi computation is one of Akka's tutorial examples. 
  • Abstraktor - my experimental Actor/CSP implementation. It's using short bounded queues (so leans more to the CSP side) and avoids deadlocks by maintaining 2 queues per Actor (in and out). If the out-queue is blocked, it just reads from the in-queue.
    I am using Nitsan Wakarts excellent MPSC queue implementation (check his blog or github jaq-in-a-box) and that's the major reason it shows kind of competitive performance+scaling.
    I use this to get a rough baseline for comparision and experiment with different flavours of Actors/CSP. Probably the only thing one can do with it is to run the Pi bench ;-).
    Update: The experimental version benchmarked here has been consolidated + improved. You can find it ohttps://github.com/RuedigerMoeller/kontraktor
  • Disruptor - my naive approach implementing the benchmark based on the Disruptor 3.2. It turned out that I used a not-yet-polished utility class, however I keep this benchmark just to illustrate how smallish differences in implementation may have big consequences.
  • Disruptor2 - As Michael Barker would have implemented it (Thanks :-) ). Its actually more than twice as fast for the 1 million test as the closest runner up.
Intel (Xeon 2 socket each 6 cores, Hyperthreading off)

Well, it seems with 100k jobs of 1000 iterations, the benchmark is dominated by computation, not concurrency control. Therefore I retry with 1 million jobs each computing a 100 iteration slice.

Ok, we probably can see differences here :-)



AMD Opteron 6274 Dual Socket 2,2 Ghz (=16 real cores, 16 HW Threads)


Again the 1 million tiny-job variant spreads the difference amongst the approaches (and their implementation):


Note there is like 5% run to run jitter (GC and stuff), however that does not change the big picture.

Last but not least: Best results per CPU architecture per lib:




Discussion of Results


We see that scaling behaviour can be significantly different depending on the hardware platform used.

Akka loses a lot of CPU time doing GC and allocation. If I modify Abstraktor to use unbounded Concurrent Linked Queue (Akka uses those), it performs similar to Akka and builds up temporary queues >100k elements. This is an inherent issue of the Actor model. By leaning towards CSP (use very short bounded blocking queues with size <100), performance also suffers, as threads are blocked/spinning for input too often. However with a queue size of 5000 elements, things work out pretty well (introducing other problems like deadlocks in case of cyclic Actor graphs).
The Disruptor library is very well implemented, so a good part of the performance advantage could be attributed to the excellent quality of implementation.

Cite from the Disruptor documentation regarding queuing:
3.5 The Problems of Queues
[...]  If an in-memory queue is allowed to be unbounded then for many classes of problem it can grow unchecked until it reaches the point of catastrophic failure by exhausting memory. This happens when producers outpace the consumers. Unbounded queues can be useful in systems where the producers are guaranteed not to outpace the consumers and memory is a precious resource, but there is always a risk if this assumption doesn’t hold and queue grows without limit. [...]
When in use, queues are typically always close to full or close to empty due to the differences in pace between consumers and producers. They very rarely operate in a balanced middle ground where the rate of production and consumption is evenly matched. [...]
Couldn't have said it better myself =).



Conclusion


Although the Disruptor worked best for this example, I think looking for "the concurrency model to go for" is wrong. If we look at the real world, we see all 4 patterns used dependent on use case.
So a broad concurrency library ideally would integrate the assembly-line pattern (~Disruptor), queued messaging (~Actors) and unqueued communication (~CSP).



Benchmark source


AKKA




Multi Threading




Abstraktor




Disruptor naive




Disruptor optimized