Monday, December 22, 2014

A persistent KeyValue Server in 40 lines and a sad fact

This post originally was submitted to the Java Advent Calendar and is licensed under the Creative Commons 3.0 Attribution license. If you like it, please spread the word by sharing, tweeting, FB, G+ and so on!

It also has been published on voxxed.com .

picking up Peters well written overview on the uses of Unsafe, i'll have a short fly-by on how low level techniques in Java can save development effort by enabling a higher level of abstraction or allow for Java performance levels probably unknown to many.

My major point is to show that conversion of Objects to bytes and vice versa is an important fundamental, affecting virtually any modern java application.

Hardware enjoys to process streams of bytes, not object graphs connected by pointers as "All memory is tape" (M.Thompson if I remember correctly ..).

Many basic technologies are therefore hard to use with vanilla Java heap objects:
  • Memory Mapped Files - a great and simple technology to persist application data safe, fast & easy.
  • Network communication is based on sending packets of bytes
  • Interprocess communication (shared memory)
  • Large main memory of today's servers (64GB to 256GB). (GC issues)
  • CPU caches work best on data stored as a continuous stream of bytes in memory
so use of the Unsafe class in most cases boil down in helping to transform a java object graph into a continuous memory region and vice versa either using
  • [performance enhanced] object serialization or
  • wrapper classes to ease access to data stored in a continuous memory region.
(Code & examples of this post can be found here)

    Serialization based Off-Heap

    Consider a retail WebApplication where there might be millions of registered users. We are actually not interested in representing data in a relational database as all needed is a quick retrieve of user related data once he logs in. Additionally one would like to traverse the social graph quickly.

    Let's take a simple user class holding some attributes and a list of 'friends' making up a social graph.


    easiest way to store this on heap, is a simple huge HashMap.
    Alternatively one can use off heap maps to store large amounts of data. An off heap map stores its keys and values inside the native heap, so garbage collection does not need to track this memory. In addition, native heap can be told to automagically get synchronized to disk (memory mapped files). This even works in case your application crashes, as the OS manages write back of changed memory regions.

    There are some open source off heap map implementations out there with various feature sets (e.g. ChronicleMap), for this example I'll use a plain and simple implementation featuring fast iteration (optional full scan search) and ease of use.

    Serialization is used to store objects, deserialization is used in order to pull them to the java heap again. Pleasantly I have written the (afaik) fastest fully JDK compliant object serialization on the planet, so I'll make use of that.


     Done:
    • persistence by memory mapping a file (map will reload upon creation). 
    • Java Heap still empty to serve real application processing with Full GC < 100ms. 
    • Significantly less overall memory consumption. A user record serialized is ~60 bytes, so in theory 300 million records fit into 180GB of server memory. No need to raise the big data flag and run 4096 hadoop nodes on AWS ;).

    Comparing a regular in-memory java HashMap and a fast-serialization based persistent off heap map holding 15 millions user records, will show following results (on a 3Ghz older XEON 2x6):

    consumed Java Heap (MB)Full GC (s)Native Heap (MB)get/put ops per srequired VM size (MB)
    HashMap6.865,0026,03903.800.000,00
    12.000,00
    OffheapMap (Serialization based)
    63,00
    0,026
    3.050
    750.000,00
    500,00

    [test source / blog project] Note: You'll need at least 16GB of RAM to execute them.

    As one can see, even with fast serialization there is a heavy penalty (~factor 5) in access performance, anyway: compared to other persistence alternatives, its still superior (1-3 microseconds per "get" operation, "put()" very similar).

    Use of JDK serialization would perform at least 5 to 10 times slower (direct comparison below) and therefore render this approach useless.


    Trading performance gains against higher level of abstraction: "Serverize me"


    A single server won't be able to serve (hundreds of) thousands of users, so we somehow need to share data amongst processes, even better: across machines.

    Using a fast implementation, its possible to generously use (fast-) serialization for over-the-network messaging. Again: if this would run like 5 to 10 times slower, it just wouldn't be viable. Alternative approaches require an order of magnitude more work to achieve similar results.

    By wrapping the persistent off heap hash map by an Actor implementation (async ftw!), some lines of code make up a persistent KeyValue server with a TCP-based and a HTTP interface (uses kontraktor actors). Of course the Actor can still be used in-process if one decides so later on.


    Now that's a micro service. Given it lacks any attempt of optimization and is single threaded, its reasonably fast [same XEON machine as above]:
    • 280_000 successful remote lookups per second 
    • 800_000 in case of fail lookups (key not found)
    • serialization based TCP interface (1 liner)
    • a stringy webservice for the REST-of-us (1 liner).
    [source: KVServer, KVClient] Note: You'll need at least 16GB of RAM to execute the test.

    A real world implementation might want to double performance by directly putting received serialized object byte[] into the map instead of encoding it twice (encode/decode once for transmission over wire, then decode/encode for offheaping map).

    "RestActorServer.Publish(..);" is a one liner to also expose the KVActor as a webservice in addition to raw tcp:




    C like performance using flyweight wrappers / structs

    With serialization, regular Java Objects are transformed to a byte sequence. One can do the opposite: Create  wrapper classes which read data from fixed or computed positions of an underlying byte array or native memory address. (E.g. see this blog post).

    By moving the base pointer its possible to access different records by just moving the the wrapper's offset. Copying such a "packed object" boils down to a memory copy. In addition, its pretty easy to write allocation free code this way. One downside is, that reading/writing single fields has a performance penalty compared to regular Java Objects. This can be made up for by using the Unsafe class.

    "flyweight" wrapper classes can be implemented manually as shown in the blog post cited, however as code grows this starts getting unmaintainable.
    Fast-serializaton provides a byproduct "struct emulation" supporting creation of flyweight wrapper classes from regular Java classes at runtime. Low level byte fiddling in application code can be avoided for the most part this way.



    How a regular Java class can be mapped to flat memory (fst-structs):


    Of course there are simpler tools out there to help reduce manual programming of encoding  (e.g. Slab) which might be more appropriate for many cases and use less "magic".

    What kind of performance can be expected using the different approaches (sad fact incoming) ?

    Lets take the following struct-class consisting of a price update and an embedded struct denoting a tradable instrument (e.g. stock) and encode it using various methods:

    a 'struct' in code

    Pure encoding performance:

    Structsfast-Ser (no shared refs)fast-SerJDK Ser (no shared)JDK Ser
    26.315.000,007.757.000,005.102.000,00649.000,00644.000,00




    Real world test with messaging throughput:

    In order to get a basic estimation of differences in a real application, i do an experiment how different encodings perform when used to send and receive messages at a high rate via reliable UDP messaging:

    The Test:
    A sender encodes messages as fast as possible and publishes them using reliable multicast, a subscriber receives and decodes them.

    Structsfast-Ser (no shared refs)fast-SerJDK Ser (no shared)JDK Ser
    6.644.107,004.385.118,003.615.584,0081.582,0079.073,00

    (Tests done on I7/Win8, XEON/Linux scores slightly higher, msg size ~70 bytes for structs, ~60 bytes serialization).

    Slowest compared to fastest: factor of 82. The test highlights an issue not covered by micro-benchmarking: Encoding and Decoding should perform similar, as factual throughput is determined by Min(Encoding performance, Decoding performance). For unknown reasons JDK serialization manages to encode the message tested like 500_000 times per second, decoding performance is only 80_000 per second so in the test the receiver gets dropped quickly:

    "
    ...
    ***** Stats for receive rate:   80351   per second *********
    ***** Stats for receive rate:   78769   per second *********
    SUB-ud4q has been dropped by PUB-9afs on service 1
    fatal, could not keep up. exiting
    "
    (Creating backpressure here probably isn't the right way to address the issue ;-)  )

    Conclusion:
    • a fast serialization allows for a level of abstraction in distributed applications impossible if serialization implementation is either
      - too slow
      - incomplete. E.g. cannot handle any serializable object graph
      - requires manual coding/adaptions. (would put many restrictions on actor message types, Futures, Spore's, Maintenance nightmare)
    • Low Level utilities like Unsafe enable different representations of data resulting in extraordinary throughput or guaranteed latency boundaries (allocation free main path) for particular workloads. These are impossible to achieve by a large margin with JDK's public tool set.
    • In distributed systems, communication performance is of fundamental importance. Removing Unsafe is  not the biggest fish to fry looking at the numbers above .. JSON or XML won't fix this ;-).
    • While the HotSpot VM has reached an extraordinary level of performance and reliability, CPU is wasted in some parts of the JDK like there's no tomorrow. Given we are living in the age of distributed applications and data, moving stuff over the wire should be easy to achieve (not manually coded) and as fast as possible. 

    Addendum: bounded latency

    A quick Ping Pong RTT latency benchmark showing that java can compete with C solutions easily, as long the main path is allocation free and techniques like described above are employed:



    [credits: charts+measurement done with HdrHistogram]

    This is an "experiment" rather than a benchmark (so do not read: 'Proven: Java faster than C'), it shows low-level-Java can compete with C in at least this low-level domain.
    Of course its not exactly idiomatic Java code, however its still easier to handle, port and maintain compared to a JNI or pure C(++) solution. Low latency C(++) code won't be that idiomatic either ;-)

    About me: I am a solution architect freelancing at an exchange company in the area of realtime GUIs, middleware, and low latency CEP (Complex Event Processing) nightly hacking at https://github.com/RuedigerMoeller.


    Tuesday, October 28, 2014

    The Internet is running in debug mode

    With the rise of the Web, textual encodings like xml/JSON have become very popular. Of course textual message encoding has many advantages from a developer's perspective. What most developers are not aware of, is how expensive encoding and decoding of textual messages really is compared to a well defined binary protocol.

    Its common to define a system's behaviour by its protocol. Actually, a protocol messes up two distinct aspects of communication, namely:

    • Encoding of messages
    • Semantics and behavior (request/response, signals, state transition of communication parties ..)

    Frequently (not always), these two very distinct aspects are mixed up without need. So we are forced to run the whole internet in "debug mode", as 99% of webservice and webapp communication is done using textual protocols.

    The overhead in CPU consumption compared to a well defined binary encoding is factor ~3-5 (JSON) up to >10-20 (XML). The unnecessary waste of bandwith also adds to that greatly (yes you can zip, but this in turn will waste even more CPU).

    I haven't calculated the numbers, but this is environmental pollution at big scale. Unnecessary CPU consumption to this extent wastes a lot of energy (global warming anyone ?).

    Solution is easy:
    • Standardize on some simple encodings (pure binary, self describing binary ("binary json"), textual)
    • Define the behavioral part of a protocol (exclude encoding)
    • use textual encoding during development, use binary in production.
    Man we could save a lot of webserver hardware if only http headers could be binary encoded ..

    Friday, October 17, 2014

    Follow up: Executors and Cache Locality Experiment

    Thanks to Jean Philippe Bempel who challenged my results (for a reason), I discovered an issue in last post: Code-completion let me accidentally choose Executors.newSingleThreadScheduledExecutor() instead of Executors.newSingleThreadExecutor(), so the pinned-to-thread-actor results are actually even better than reported previously. The big picture has not changed that much, but its still worthwhile reporting.

    On a second note: There are many other aspects to concurrent scheduling such as queue implementations etc.. Especially if there is no "beef" inside the message processing, these differences become more dominant compared to cache misses, but this is another problem that has been covered extensively by other people in depth (e.g. Nitsan Wakart).

    Focus of this experiment is locality/cache misses, keep in mind different queueing implementations of executors for sure add dirt/bias.

    As requested, I add results from the linux "perf" tool to prove there are significant differences in cache misses caused by random assignment of Thread - to Actor as done by ThreadPoolExecutor and WorkStealingExecutor.

    Check out my recent post for a description of the test case.

    Results with adjusted SingleThreadExecutor (XEON 2 socket, each 6 cores, no HT)


    As in previous post, "dedicated" actor-pinned-to-thread performs best. For very small local state, there are only few cache misses so differences are small, but widen once a bigger chunk of memory is accessed by each actor. Note that ThreadPool is hampered by its internal scheduling/queuing mechanics, regardless of locality, it performs weak.


    When increasing number of Actors to 8000 (so 1000 actors per thread), "Workstealing" and "Dedicated" perform similar. Reason: executing 8000 actors round robin creates cache misses for both executors. Note that in a real world server its likely that there are active and inactive actors, so I'd expect "Dedicated" to perform slightly better than in this synthetic test.

    "perf stat -e" and "perf stat -cs" results

    (only 2000, 4000, 8000 local size tests where run)

    Dedicated/actor-pinned-to-thread:
    333,669,424 cache-misses                                                
    19.996366007 seconds time elapsed
    185,440 context-switches                                            
    20.230098005 seconds time elapsed
    => 9,300 context switches per second

    workstealing:
    2,524,777,488 cache-misses                                                
    39.610565607 seconds time elapsed
    381,385 context-switches                                            
    39.831169694 seconds time elapsed
    => 9,500 context switches per second

    fixedthreadpool:
    3,213,889,492 cache-misses                                                
    92.141264115 seconds time elapsed
    25,387,972 context-switches                                            
    87.547306379 seconds time elapsed
    =>290,000 context switches per second








    A quick test with a more realistic test method

    In order to get a more realistic impression I replaced the synthetic int-iteration by some dirty "real world" dummy stuff (do some allocation and HashMap put/get). Instead of increasing the size of the "localstate" int array, I increase the HashMap size  (should also have negative impact on locality).


    Note that this is rather short processing, so queue implementations and executor internal implementation might dominate locality here. This test is run on Opteron 8c16t * 2Sockets, a processor with 8kb L1 cache size only. (BTW: impl is extra dirty, so no performance optimization comments pls, thx)


    As ThreadPoolExecutor is abnormous bad in this Test/Processor combination, plain numbers:

    64 HMap entries256 HMapentries2000 HMapentries4000 HMapentries32k HMapentries320k HMapentries
    WorkStealing107010711097112912381284
    Dedicated656646661649721798
    ThreadPool83148751941294921026910602

    Conclusions basically stay same as in original post. Remember cache misses are only one factor of overall runtime performance, so there are workloads where results might look different. Quality/specialization of queue implementation will have huge impact in case processing consists of only some lines of code.

    Finally, my result: 
    Pinning actors to threads created lowest cache miss rates in any case tested.




    Tuesday, October 14, 2014

    Experiment: Cache effects when scheduling Actors with F/J, Threadpool, Dedicated Threads

    Update: I accidentally used newSingleThreadScheduledExecutor instead of newFixedThreadPool(1) for the "Dedicated" test case [ide code completion ..]. With this corrected, "Dedicated" outperforms even more. See follow up post for updated results + "perf" tool cache miss measurement results (do not really change the big picture).

    The experiment in my last post had a serious flaw: In an actor system, operations on a single actor are executed one after the other. However by naively adding message-processing jobs to executors, private actor state was accessed concurrently, leading to "false-sharing" and cache coherency related costs especially for small local state sizes.

    Therefore I modified the test. For each Actor scheduled, the next message-processing is scheduled once the previous one finished, so the experiment resembles the behaviour of typical actors (or lightweight processes/tasks/fibers) correctly without concurrent access to a memory region.

    Experiment roundup:

    Several million messages are scheduled to several "Actor" simulating classes. Message processing is simulated by reading and writing the private, actor-local state in random order. There are more Actors (24-8000) than threads (6-8). Note that results established (if any) will also hold true for other light-weight concurrency schemes like go-routines, fibers, tasks ...

    The test is done with

    • ThreadPoolExecutor
    • WorkStealingExecutor
    • Dedicated Thread (Each Actor has a fixed assignment to a worker thread)

    Simulating an Actor accessing local state:


    Full Source of Benchmark

     Suspection:
    As ThreadPoolExecutor and WorkStealingExecutor schedule each message on a random Thread, they will produce more cache misses compared to pinning each actor onto a fixed thread. Speculation is, that work stealing cannot make up for the costs provoked by cache misses.


    (Some) Variables:
    • Number of worker threads
    • Number of actors
    • Amount of work per message
    • Locality / Size of private unshared actor state


    8 Threads 24 actors 100 memory accesses (per msg)


    Interpretation:

    For this particular load, fixed assigned threads outperform executors. Note: the larger the local state of an actor, the higher the probability of a prefetch fail => cache miss. In this scenario my suspection holds true: Work stealing cannot make up for the amount of cache misses. fixed assigned threads profit, because its likely, some state of a previously processed message resides still in cache once a new message is processed on an actor.
    Its remarkable how bad ThreadpoolExecutor performs in this experiment.

    This is a scenario typical for backend-type service: There are few actors with high load. When running a front end server with many clients, there are probably more actors, as typically there is one actor per client session. Therefor lets push up the number of actors to 8000:

    8 Threads 8000 actors 100 memory accesses (per msg)



    Interpretation:

    With this amount of actors, all execution schemes suffer from cache misses, as the accumulated size of 8000 actors is too big to fit into L1 cache. Therefore the cache advantage of fixed-assigned threads ('Dedicated') does not make up for the lack of work stealing. Work Stealing Executor outperforms any other execution scheme if a large amount of state is involved.
    This is a somewhat unrealistic scenario as in a real server application, client request probably do not arrive "round robin", but some clients are more active than others. So in practice I'd expect "Dedicated" will at least have some advantage of higher cache hits. Anyway: when serving many clients (stateful), WorkStealing could be expected to outperform.

    Just to get a third variant: same test with 240 actors:


    These results complete the picture: with fewer actors, cache effect supercede work stealing. The higher the number of actors, the higher the number of cache misses gets, so work stealing starts outperforming dedicated threads.


    Modifying other variables

    Number of memory accesses

    If a message-processing does few memory accesses, work stealing improves compared to the other 2. Reason: fewer memory access means fewer cache misses means work stealing gets more significant in the overall result.

     ************** Worker Threads:8 actors:24 #mem accesses: 20
    local state bytes: 64 WorkStealing avg:505
    local state bytes: 64 ThreadPool avg:2001
    local state bytes: 64 Dedicated avg:557
    local state bytes: 256 WorkStealing avg:471
    local state bytes: 256 ThreadPool avg:1996
    local state bytes: 256 Dedicated avg:561
    local state bytes: 2000 WorkStealing avg:589
    local state bytes: 2000 ThreadPool avg:2109
    local state bytes: 2000 Dedicated avg:600
    local state bytes: 4000 WorkStealing avg:625
    local state bytes: 4000 ThreadPool avg:2096
    local state bytes: 4000 Dedicated avg:600
    local state bytes: 32000 WorkStealing avg:687
    local state bytes: 32000 ThreadPool avg:2328
    local state bytes: 32000 Dedicated avg:640
    local state bytes: 320000 WorkStealing avg:667
    local state bytes: 320000 ThreadPool avg:3070
    local state bytes: 320000 Dedicated avg:738
    local state bytes: 3200000 WorkStealing avg:1341
    local state bytes: 3200000 ThreadPool avg:3997
    local state bytes: 3200000 Dedicated avg:1428


    Fewer worker threads

    Fewer worker threads (e.g. 6) increase probability of an actor message being scheduled to the "right" thread "by accident", so cache miss penalty is lower which lets work stealing perform better than "Dedicated" (the fewer threads used, the lower the cache advantage of fixed assigned "Dedicated" threads). Vice versa: if the number of cores involved increases, fixed thread assignment gets ahead.

    Worker Threads:6 actors:18 #mem accesses: 100
    local state bytes: 64 WorkStealing avg:2073
    local state bytes: 64 
    ThreadPool avg:2498
    local state bytes: 64 Dedicated avg:2045
    local state bytes: 256 WorkStealing avg:1735
    local state bytes: 256 
    ThreadPool avg:2272
    local state bytes: 256 Dedicated avg:1815
    local state bytes: 2000 WorkStealing avg:2052
    local state bytes: 2000 
    ThreadPool avg:2412
    local state bytes: 2000 Dedicated avg:2048
    local state bytes: 4000 WorkStealing avg:2183
    local state bytes: 4000 
    ThreadPool avg:2373
    local state bytes: 4000 Dedicated avg:2130
    local state bytes: 32000 WorkStealing avg:3501
    local state bytes: 32000 
    ThreadPool avg:3204
    local state bytes: 32000 Dedicated avg:2822
    local state bytes: 320000 WorkStealing avg:3089
    local state bytes: 320000 
    ThreadPool avg:2999
    local state bytes: 320000 Dedicated avg:2543
    local state bytes: 3200000 WorkStealing avg:6579
    local state bytes: 3200000 
    ThreadPool avg:6047
    local state bytes: 3200000 Dedicated avg:6907

    Machine tested:

    (real cores no HT)
    $ lscpu 
    Architecture:          x86_64
    CPU op-mode(s):        32-bit, 64-bit
    Byte Order:            Little Endian
    CPU(s):                12
    On-line CPU(s) list:   0-11
    Thread(s) per core:    1
    Core(s) per socket:    6
    Socket(s):             2
    NUMA node(s):          2
    Vendor ID:             GenuineIntel
    CPU family:            6
    Model:                 44
    Stepping:              2
    CPU MHz:               3067.058
    BogoMIPS:              6133.20
    Virtualization:        VT-x
    L1d cache:             32K
    L1i cache:             32K
    L2 cache:              256K
    L3 cache:              12288K
    NUMA node0 CPU(s):     1,3,5,7,9,11
    NUMA node1 CPU(s):     0,2,4,6,8,10


    Conclusion
    • Performance of executors depends heavy on use case. There are work loads where cache locality dominates, giving an advantage of up to 30% over Work-Stealing Executor
    • Performance of executors varies amongst different CPU types and models (L1 cache size + cost of a cache miss matter here)
    • WorkStealing could be viewed as the better overall solution. Especially if a lot of L1 cache misses are to be expected anyway.
    • The ideal executor would be WorkStealing with a soft actor-to-thread affinitiy. This would combine the strength of both execution schemes and would yield significant performance improvements for many workloads
    • Vanilla thread pools without work stealing and actor-to-thread affinity perform significantly worse and should not be used to execute lightweight processes.
    Source of Benchmark

    Sunday, October 12, 2014

    Alternatives to Executors when scheduling Tasks/Actors



    Executors work well when fed with short units of stateless work, e.g.: division of a computation onto many CPU's. However they are sub-optimal to schedule jobs which are part of an ongoing, larger unit of work, e.g.: Scheduling messages of an actor or lightweight process.

    Many Actor Frameworks (or similar message passing concurrency frameworks) schedule batches of messages using an Executor service. As the Executor service is not context aware, this spreads out processing of a single Actor/Task's messages across several threads/CPU's.

    This can lead to many cache misses when accessing the state of an Actor/Process/Task as the processing thread changes frequently.
    Even worse, this way CPU caches cannot stabilize as each new "Runnable" washes out cached memory of the previously processed task's.

    A second issue arises when using busy-spin polling. If a framework reads its queues using busy-spin, it generates 100% CPU load for each processing Thread. So one would like to add a second thread/core only if absolutely required, so a one-thread-per-actor policy is not feasible.

    With Kontraktor 2.0 I implemented a different scheduling mechanism, which achieves a horizontal scaling using a very simple metric to measure actual application CPU requirements, without randomly spreading processing of an actor onto different Cores's.


    An actor has a fixed assignment to a workerthread ("DispatcherThread"). The scheduler periodically reschedules Actors by moving them to another worker if necessary.

    Since overly sophisticated algorithms tend to introduce high runtime costs, actual scheduling is done in a very simple manner:

    1) a Thread is observed as being overloaded, if the consume loop (pulling messages from the actor queue) has not been idle for N messages (currently N=1000).
    2) If a Thread has been marked "overloaded", Actors with largest mailbox sizes are migrated to a newly started Thread (until #Threads == ThreadMax) as long SUM_QUEUED_MSG(Actors scheduled on Thread A) > SUM_QUEUED_MSG(Actors scheduled on newly created Thread B).
    3) in case #Threads == ThreadMax, actors are rebalanced based on summation of queued messages and "overload" observations.

    Problem areas: 
    • If the processing time of messages vary a lot, summation of queued messages is misleading. An improvement would be to add a weight onto each message type by profiling periodically. A simpler option would be to give an Actor an additional weight to be multiplicated with its queue size.
    • For load bursts, there is a latency until all of the available CPU's are actually used.
    • The delay until JIT kicks in produces bad profiling data and leads to false scale ups (heals over time, so not that bad).

    An experiment

    Update: Woke up this morning and it came to my mind this experiment has a flaw, as jobs per workitem are scheduled in parallel for the executor tests, so I am probably measuring the effects of false sharing. Results below removed, check the follow up post.



    Performance cost of adaptive Scaling


    To measure cost of auto-scheduling vs. explicit Actor=>Thread assignment, I run the Computing-Pi Test (see previous posts).
    These numbers do not show effects of locality as it compares explicit scheduling with automatic scheduling.

    Test 1 manually assigns a Thread to each Pi computing Actor,
    Test 2 always starts with one worker and needs to scale up automatically once it detects actual load.


    (note example requires >= kontraktor2.0-beta-2, if the 'parkNanos' stuff is uncommented it scales up to 2-3 threads only)

    The test is run 8 times with increasing thread_max by one with each run.

    results:

    Kontraktor Autoscale (always start with 1 thread, then scale up to N threads)

    1 threads : 1527
    2 threads : 1273
    3 threads : 718
    4 threads : 630
    5 threads : 521
    6 threads : 576
    7 threads : 619
    8 threads : 668

    Kontraktor with dedicated assignment of threads to Actors (see commented line in source above)

    1 threads : 1520
    2 threads : 804
    3 threads : 571
    4 threads : 459
    5 threads : 457
    6 threads : 534
    7 threads : 615
    8 threads : 659


    Conclusion #2

    Differences in runtimes can be attributed mostly to the delay in scaling up. For deterministic load problems, prearranged Actor scheduling is more efficient ofc. However thinking of a server receiving varying request loads over time, automatic scaling is a valid option.

    Saturday, September 27, 2014

    Breaking the habit: Solving the Dining Philosopher's problem with Actors

    Concurrent programming using Actors is different to traditional mutex based approaches, therefore I'd like to show how the solution of a classical concurrency problem can be done using Actors. It requires some time to get used to it as future/actor based concurrency looks different and uses different patterns.

    Additionally I'll show how little effort is required to make this a distributed application, as asyncronous Actor based concurrency is much more resilient regarding network induced latency.

    I'll use my Kontraktor Actor library version 2.0 beta to also illustrate how the "message == asynchronous method call" approach of Kontraktor works out in "practice".

    Resources:

    Kontraktor lib: https://github.com/RuedigerMoeller/kontraktor.
    Full source of sample is here.

    Description of the philosophers dining problem:
    http://en.wikipedia.org/wiki/Dining_philosophers_problem
    (I am using a non-fair solution to keeps things simple.)


    The Table

    One actor represents the table. For each fork, a list of philosophers waiting for "fork is free"-notification is kept.

    Short explanation: In Kontraktor all public methods are translated into asynchronous messages put on to an actors queue ("mailbox") behind the scenes. The worker thread of an actor takes the messages of the mailbox and calls the "real" implementation. So its guaranteed an actor is executing single threaded. To identify asynchronous calls easily, I always put a $ in front of public actor methods.


    Explanation: if a fork is taken and the queue of waiting philosophers is empty, a fulfilled promise is returned, else an unfulfilled promise is added to the queue. The caller code then looks like:


    Once a fork is returned, the next Promise in the arraylist (if any) is notified. signal() is equivalent to receive("dummy",null).

    Note that the "result" object is unused, I just need a signal from the future returned. That's why the fulfilled Promise gets a "void" as a dummy result object (which will then trigger immediate execution of the then-closure on caller side).

    Note how I can use simple single threaded collections and data structures as the actor is guaranteed to be executed single threaded.

    The Philosopher

    A philosopher has a wonderful life which can be divided into 3 states:

    10 think randomTime
    20 be_hungry_and_try_to_grab_2_forks
    30 eat randomTime
    40 goto 10

    [rumors are of lost lines 15 and 35 related to indecorous social activities introduced after Epicurus happened to join the table]


    The meat is in the $live method:
    After thinking, the philosopher tries to grab one fork, the closure given to the "then" method gets executed once the fork becomes available. Once both forks have been grabbed, another "delayed" call keeps the "eat" state for a while, then both forks are returned and a $live message is put onto the actors mailbox (=message queue) to start thinking again.

    Remarks:

    • "delayed" just schedules the given closure onto the actors mailbox with a delay.
    • "self().$live()" puts a $live message onto the actors mailbox. If "this.$live()" would be called, the $live() message would be executed directly [synchronous] like a normal method. In this example this would not hurt, however in many cases this makes a difference.
    • Wait-Free. The executing thread is never blocked. One could easily simulate thousands of philosophers onto a single thread

    Setup and run

    The main method to start the life of 5 philosophers looks like this:

    and

    Note Hoarde is a utility to create and manage groups of Actors. "startReportingThread" just starts a thread dumping the state of all philosophers:


    "Is this still Java?" .. probably weird looking stuff for the untrained eye ..
    (note Philosopher.$getState is async and yield() is used to wait for all futures to complete before executing the then() closure printing the state)

    BTW: spacing is somewhat inconsistent as I started to reformat code partially during edit .. but won't do screenshots again ..

    Yay, done:



    Let's make it a distributed application

    The killer feature of Actor based asynchronous+lock/wait free programming is simple distributability. A solution using blocking mutexes would have to get a rewrite or rely on synchronous remote calls, so the bigger the network latency, the lower the throughput. Actors do not suffer from that.
    We can just change the startup to make the table run remotely:


    Is this cool or what ? I now can feed hoardes of philosophers with a single table server:





    Saturday, August 30, 2014

    Backpressure in async communication systems

    I recently saw this excellent presentation video of Mr Kuhn ..  and later on found about the community standardization effort of reactive streams.

    As I have done many experiments dealing with backpressure issues in a high volume async distributed system, I'd like to point out some possible pitfalls of the "flow control" mechanism proposed in the reactive streams spec and point out how I dealt with the problem (after having tried similar solutions).

    The problem of flow control arises in asynchronous communication if the sender sends messages at a higher pace than (one of the) receivers can process them.
    This problem is well known from networking and actually TCP implements a similar scheme to the one proposed in the RStreams spec. Another - less known - flow control scheme is NAK. NAK is used typically in low latency brokerless (multicast) messaging products.

    The "Reactive Streams" spec proposes an acknowledge based protocol (although its not presented as such).

    Receiver sends "requestMore(numberOfMsg)" to Sender
    Sender sends "numberOfMsg" messages to Receiver

    by emitting "requestMore", a Receiver signals it has received+processed (or will have finished processing soon) previously received messages. Its an acknowledgement message (ACK).

    Problematic areas:

    1. it works for point to point communication only. If you have a multicast situation (e.g. one sender multicasts to N receivers), this does not work well.
      Reason:
      In order to make this work, the sender needs to be aware of all receivers, then wait for every receiver to have sent a "requestMore", then compute the minimum of all "requestMore" messages and finally send this amount of messages. Once a receiver dies, no "requestMore" is send/received, so the sender can't sent anything for an extended amount of time (receiver-timeout). If one receiver pauses (e.g. GC's), a "requestMore" is missing, so the sender needs to stall all communication.
      You can see those effects in action with JGroups using a configuration with credit based flow control protocol. Once a cluster node terminates, throughput falls to zero until the node-leave timeout is reached (similar issues are observable with the ACK-based NACKACK reliable UDP transport of JGroups). ACK ftl :-)
    2. The actual throughput depends hard on connection latency. As the ACK signal ("requestMore") takes a longer time then to reach the sender, the receiver buffer must be enlarged depending on latency, additionally the size of chunks requested by "requestMore" must be enlarged. If one does not do this, throughput drops depending on latency. Short said:
      sizeOfBuffer needs to be >= eventsProcessable per roundTripLatency interval. This can be solved by runtime introspection, however in practice this can be tricky especially with bursty traffic.
    3. As long reliable communication channels (in memory, TCP) are used: Those transports already have implemented a flow control / backpressure mechanism, so "Reactive Streams" effectively doubles functionality present at a lower and more efficient level. Its not too hard to make use of the backpressure signals of e.g. TCP (one solution is an "outbound" queue on sender side, then observe its size). One can achieve the same flow control results without any need for explicit application level ACK/Pull messages like "requestMore". For inmemory message passing, one simply can inspect the queue size of the receiver. [you need a queue implementation with a fast size() implementation then ofc]

    Alternative: NAK instead of ACK, adaptive rate limits

    The idea of NAK (=negative acknowledgement) is raise backpressure signals from receiver side only in case a component detects overload. If receivers are not in an overload situation, no backchannel message traffic is present at all.

    The algorithm outlined below worked for me in a high throughput distributed cluster:

    Each sender has a send rate limit which is increased stepwise over time. Usually the starting default is a high send rate (so its actually not limiting). Receivers observe the size of their inbound queue. Once the queue size grows to certain limits, receivers emit an NAK( QFillPercentage ) message. E.g. 50%, 75%, 87%, 100%. The sender then increases the send rate limit depending on the "strength" of the NAK message. To avoid permanent degrade, the sender increases the send rate stepwise (the step size may depend on the time no NAK was received).

    This works for 1:1 async communication patterns as well as for 1:N. In an 1:N situation the sender reacts to the highest strength NAK message within a timing window (e.g. 50ms). The 0-traffic-on-dying-receiver problem is not present. Additionally the sender does not need to track the number of receivers/subscribers, which is a very important property in 1:N multicast messaging.

    Finding the right numbers for NAK-triggers, and sender speed up steps can be tricky as they depend on the volatility of sendrate and latency of transport link. However it should be possible to adaptively find appropriate values at runtime. In practice I used empirical values found by experimenting with the given system on sample loads, which is not a viable solution for a general implementation ofc.

    The backpressure signal raised by the NAK messages can be applied to the sender either blocking or nonblocking. In concrete system I just blocked the sending thread which effectively puts a dynamically computed (current rate limit) "cost" for each async message send operation. Though this is blocking , it did not hurt performance, as there was no "other work" to do in case a receiver can't keep up. However backpressure signals can be applied in a nonblocking fashion as well ("if (actualSendRate < sendRateLimit) { send } else { do other stuff }").