Saturday, August 30, 2014

Backpressure in async communication systems

I recently saw this excellent presentation video of Mr Kuhn ..  and later on found about the community standardization effort of reactive streams.

As I have done many experiments dealing with backpressure issues in a high volume async distributed system, I'd like to point out some possible pitfalls of the "flow control" mechanism proposed in the reactive streams spec and point out how I dealt with the problem (after having tried similar solutions).

The problem of flow control arises in asynchronous communication if the sender sends messages at a higher pace than (one of the) receivers can process them.
This problem is well known from networking and actually TCP implements a similar scheme to the one proposed in the RStreams spec. Another - less known - flow control scheme is NAK. NAK is used typically in low latency brokerless (multicast) messaging products.

The "Reactive Streams" spec proposes an acknowledge based protocol (although its not presented as such).

Receiver sends "requestMore(numberOfMsg)" to Sender
Sender sends "numberOfMsg" messages to Receiver

by emitting "requestMore", a Receiver signals it has received+processed (or will have finished processing soon) previously received messages. Its an acknowledgement message (ACK).

Problematic areas:

  1. it works for point to point communication only. If you have a multicast situation (e.g. one sender multicasts to N receivers), this does not work well.
    Reason:
    In order to make this work, the sender needs to be aware of all receivers, then wait for every receiver to have sent a "requestMore", then compute the minimum of all "requestMore" messages and finally send this amount of messages. Once a receiver dies, no "requestMore" is send/received, so the sender can't sent anything for an extended amount of time (receiver-timeout). If one receiver pauses (e.g. GC's), a "requestMore" is missing, so the sender needs to stall all communication.
    You can see those effects in action with JGroups using a configuration with credit based flow control protocol. Once a cluster node terminates, throughput falls to zero until the node-leave timeout is reached (similar issues are observable with the ACK-based NACKACK reliable UDP transport of JGroups). ACK ftl :-)
  2. The actual throughput depends hard on connection latency. As the ACK signal ("requestMore") takes a longer time then to reach the sender, the receiver buffer must be enlarged depending on latency, additionally the size of chunks requested by "requestMore" must be enlarged. If one does not do this, throughput drops depending on latency. Short said:
    sizeOfBuffer needs to be >= eventsProcessable per roundTripLatency interval. This can be solved by runtime introspection, however in practice this can be tricky especially with bursty traffic.
  3. As long reliable communication channels (in memory, TCP) are used: Those transports already have implemented a flow control / backpressure mechanism, so "Reactive Streams" effectively doubles functionality present at a lower and more efficient level. Its not too hard to make use of the backpressure signals of e.g. TCP (one solution is an "outbound" queue on sender side, then observe its size). One can achieve the same flow control results without any need for explicit application level ACK/Pull messages like "requestMore". For inmemory message passing, one simply can inspect the queue size of the receiver. [you need a queue implementation with a fast size() implementation then ofc]

Alternative: NAK instead of ACK, adaptive rate limits

The idea of NAK (=negative acknowledgement) is raise backpressure signals from receiver side only in case a component detects overload. If receivers are not in an overload situation, no backchannel message traffic is present at all.

The algorithm outlined below worked for me in a high throughput distributed cluster:

Each sender has a send rate limit which is increased stepwise over time. Usually the starting default is a high send rate (so its actually not limiting). Receivers observe the size of their inbound queue. Once the queue size grows to certain limits, receivers emit an NAK( QFillPercentage ) message. E.g. 50%, 75%, 87%, 100%. The sender then increases the send rate limit depending on the "strength" of the NAK message. To avoid permanent degrade, the sender increases the send rate stepwise (the step size may depend on the time no NAK was received).

This works for 1:1 async communication patterns as well as for 1:N. In an 1:N situation the sender reacts to the highest strength NAK message within a timing window (e.g. 50ms). The 0-traffic-on-dying-receiver problem is not present. Additionally the sender does not need to track the number of receivers/subscribers, which is a very important property in 1:N multicast messaging.

Finding the right numbers for NAK-triggers, and sender speed up steps can be tricky as they depend on the volatility of sendrate and latency of transport link. However it should be possible to adaptively find appropriate values at runtime. In practice I used empirical values found by experimenting with the given system on sample loads, which is not a viable solution for a general implementation ofc.

The backpressure signal raised by the NAK messages can be applied to the sender either blocking or nonblocking. In concrete system I just blocked the sending thread which effectively puts a dynamically computed (current rate limit) "cost" for each async message send operation. Though this is blocking , it did not hurt performance, as there was no "other work" to do in case a receiver can't keep up. However backpressure signals can be applied in a nonblocking fashion as well ("if (actualSendRate < sendRateLimit) { send } else { do other stuff }").








11 comments:

  1. The main driver behind starting the Reactive Streams initiative was to transport data losslessly across an asynchronous boundary in-memory without requiring unbounded buffers. Your stipulation that “the in-memory transport” already implements this is clearly false, as has been evidenced by OOME questions from users of Akka (unbounded Actor mailbox), RxJava (unbounded cross-thread handoff), vert.x (unbounded buffering in the event bus) etc.

    Your arguments for preferring NAK over ACK certainly are applicable for network transports, especially if there is a retransmit mechanism that can achieve losslessness, but for the currently specified interfaces it would not improve the situation. We found that a NAK-based approach within a JVM cannot effectively remove unbounded buffer growth at the recipient unless dropping is allowed, and dropping can only be allowed if the sender buffers enough to be able to retransmit when needed—defeating the purpose of reducing buffering at the receiver side.

    Another way to describe the difference in philosophy is that we want to achieve bounded memory consumption while processing streams, which means that if dropping is not allowed then the stream throughput will suffer during periods of congestion. Your use-case of doing best-effort 1:N distribution can be modeled on top of Reactive Streams by adding explicit buffers (bounded, with suitable dropping strategy) per receiver after the fan-out element; that way no single slow consumer can affect the others.

    There is one small but important inaccuracy in your initial thesis: Reactive Streams do not in fact implement ACK-based flow control, because the `request(n)` call does not acknowledge receipt of previously received elements. It rather signals the receiver’s capacity to accept `n` more items in the future. Only if the receiver waits until all outstanding elements are delivered before asking for more does this degenerate into an ACK-based system. How a network transport model for Reactive Streams will look like has not yet been discussed.

    ReplyDelete
  2. +1 for extensive response. I'll respond tomorrow as I need some time for thinking and word-smithing..

    ReplyDelete
  3. First: I am aware your are tackling a very concrete issue while I am arguing from outer space ("bubbles never crash").

    1) Imho one of the most important properties of async communication is, that reactive applications are inherently distributable without the need to change the application structure.
    So I'd argue any flow control should work for the distributed case in the first place. If it works distributed, it will also work locally. This does not hold true vice versa. Same applies to 1:1 vs 1:N.

    2) If queues overflow to OOM, your system is overloaded (or has design/load balancing issues). If your system is overloaded, you need to reduce the rate of external input events (number of users, number of requests, number of prices per second). Your approach is to "backpropagate" this point2point all along the processing chain. However

    A ==> B ==> C (where "==>" is for backpressure/pull signal) is aequivalent to
    A ==> C

    In a complex async system, this point 2 point backpressure propagation easily leads to stalling/bad throughput, as you need several backpressure hops until you reach the actual source of overload. An alternative is to have a kind of global supervisor (with system knowledge) directly limiting the system input rate once an actor/channel input queue grows beyond limits. Actually this is something a scheduler could do (GO does that indirectly as it has "virtual stacks", so it can "pseudo-block" without actually blocking a thread).

    3) Unbounded queues. If you think about it, your "request(n)" is a way of the receiver to tell the sender about his queue size. Why do you do that ? Because you use ConcurrentLinkedQueue, and you cannot query "size()" on ConcurrentLinkedQueue in an efficient way. If the sender would be able to ask for queue size and MAX_QUEUE size, the sender would be able to generate "request(n)" messages by itself.
    I know bounded queues come at the price of deadlocks instead of OOM :-). However as the JVM does not have continuations/virtual stacks such as GO, blocking is the most efficient way to backpressure. An intelligent scheduler can weed out blocked Threads and avoid to schedule other actors on that Thread.

    "
    Your arguments for preferring NAK over ACK certainly are applicable for network transports [..] We found that a NAK-based approach within a JVM cannot effectively remove unbounded buffer growth at the recipient unless dropping is allowed, and dropping can only be allowed if the sender buffers enough to be able to retransmit when needed—defeating the purpose of reducing buffering at the receiver side.
    "

    The aequivalent of NAQ inside a single process is blocking :-) aka bounded queues. The approach I outlined in OP is just a refined version of blocking [Slowing down single send operations (by blocking) as the receiver queue approaches its limit].

    "
    There is one small but important inaccuracy in your initial thesis: Reactive Streams do not in fact implement ACK-based flow control, because the `request(n)` call does not acknowledge receipt of previously received elements. It rather signals the receiver’s capacity to accept `n` more items in the future. Only if the receiver waits until all outstanding elements are delivered before asking for more does this degenerate into an ACK-based system
    "

    I disagree here. You just change the semantics of the ACK signal, but its still ACK. "waiting for all elements" is not a determining property to make a protocol ACK (see TCP). I'd characterice ACK protocols more along the lines of "No ACK - no sending".

    Anyway: Giving the prerequisites of the concrete issue you are tackling (unbounded queues, cross library, JVM) your approach seems like the most viable and practical approach. I don't think it can serve as a "general" approach to couple asynchronous systems.

    -Rüdiger

    ReplyDelete
  4. There are lots of false assumptions you are making, I’m not sure we’ll reach a common language to have a discussion. Please consider that I deem the following to be true:

    1) distributed does not imply 1:N streaming
    2) mastering distributed flow control does not imply mastering in-process flow control
    3) point-to-point back-pressure is not equivalent to global back-pressure
    4) global supervisor with system knowledge cannot be constructed in practice
    5) signaling demand cannot be replaced by just knowing the queue size
    6) ConcurrentLinkedQueue is not the reason why the recipient manages its queue instead of the sender
    7) blocking is not the most efficient back-pressure mechanism, it does not scale
    8) intelligent schedulers will consume more resources for managing more threads
    9) bounded queues do NOT imply blocking
    10) acknowledging receipt of something is not the same thing as expressing demand for something

    I think we need to hash these out first before we can proceed to discussing how to control asynchronous communication in general. That discussion should then probably be had at https://github.com/reactive-streams/reactive-streams/issues/45.

    ReplyDelete
  5. if the assumptions are false or not is questionable. However +1 for putting up that list of differences.

    1) agreed. However 1:N streaming is essential in order to scale out for many application classes. TCP does not scale. So I'd always spend a thought if an algorithm will work with 1:N
    2) My view: memory is just a transport with very low latency. So why make a difference ? I mean you said that well in your talk ("multicore/multisocket is like a cluster in a box").
    3) partially agree. Under ideal conditions (no discon, guaranteed bandwidth), any mesh of point 2 point backpressured actors can be "flow controlled" by directly limiting the "producing" actors or external input. Agree in practice conditions frequently are not ideal, so p2p is needed sometimes. However regarding "in-memory" I'd argue conditions are ideal, same as inside a self hosted LAN cluster's.
    4) It can be constructed in many cases, but for many cases its not possible. Agree here, my view might be narrowed somewhat to the system I am working with. Examples ?
    5) I mean "current Q fill size + current Q max size". If the queue fills up, obviously send rate is too high. If Q is near empty, no need to throttle sender. You can generate (simplified) "request(MAX_SIZE-CURRENT_SIZE)" from that. Can you give a (non-cornercase) example where this does not hold true ?
    6) ok, what's the reason then ?
    7) on current hardware + JVM its the fastest way for the receiver to signal "stop, dude". As the VM has no continuations like GO, its pretty expensive to implement backpressure block-free. When a sender "offer()'s" without success (because receiver Q is full), the only way to preserve the sender's state+call stack is to halt the thread. GO can preserve the stack and use current thread for other stuff. Inside the JVM we cannot do that. That's the basic reason many libs fallback to unbounded queues (thereby throwing away "natural" backpressure signals), because with bounded queues they get deadlocks [as threads must be halted or stalled somehow].
    8) true
    9) I use "blocking" and "bounded" synonymous, my bad. However the sender's thread has state and a callstack, i am curious on how to handle the case of receiver-Q-full (~"blocked" sender) without somehow "halting" the sender thread ? I'd really be interested in further explanation of your (9) statement.
    10) I think you are putting lipstick on the pig here. It will have exactly the characteristics of an (async) ack protocol. A standard by the books ACK always implicitely expresses "I am free for the next X operations".

    Oops (haven't read last sentence) .. agree on moving discussion. Public places are not the best place to weed out things [however traffic on this blog is not that high ;) ]. I am off one week to Mumbai Stock Exchange support (they use T7). Maybe you can give a quick shot on (9) ?

    ReplyDelete
  6. Maybe my first sentence was a bit harsh, thanks for seeing past it!

    1) 1:N fanout (replicating the streams) is indeed problematic but not needed for scalability; routing partitions of the streams is necessary but not problematic since a slow consumer can just be ignored
    2) My statement was about considering a single network host as internally distributed, but that does not mean that the optimal flow-control solution is the same in both external and internal networks; this is due to differences in latency profiles and loss probabilities.
    3) What you are not considering is that global back-pressure can only work if you can make assumptions about the inner workings of your network. You use this in order to save yourself the hassle of propagating back-pressure locally which means unbounded buffering or synchronous processing. Therefore p2p is the only thing that you can make “just work” in an PnP fashion.
    4) Very close to my previous paragraph: you’ll make assumptions about the processing and if they don’t hold (systematically or temporarily) then your machinery will fail. There is no generalized solution, it always needs to be tailored with specific knowledge.
    5) counter-examples: queue is remote, recipient deliberately does not want to fill it up right now (i.e. dynamic MAX_SIZE); you’ll need communication anyway, so just do it right to begin with
    6) The recipient autonomously decides when to process how much, independent of the queue implementation.
    7) Try managing 1 million TCP connections that way, it will not work. Another question is whether you want low-latency throttling, low-latency wake-up or high-throughput scheduling. Microbenchmarking one particular detail does not help. This is about CPU instructions much more than about any language or VM technology, co-routines etc. don’t play into it.
    9) Continuation-passing style or Actor Model come to mind. Threads and call stacks are a machine abstraction, but a very low-level one. When downstream demand reaches zero our Akka Streams actors will simply not pull more from their own upstream, and since they are message-driven they will simply not be scheduled until more demand arrives from the very downstream. No threads need to be blocked for this and no call stacks preserved, but the actor remembers where it was in the stream (and might hold an input buffer etc.).
    10) As Viktor also argued on github: demand can be signaled without having received any data, hence it is not an acknowledgement. I get your point that a stable running system will have a relationship that turns out similar between Publisher and Subscriber but that does not change this semantic difference. Also, an ACK does not by itself imply capacity for more input, that just happens to be piggy-backed onto it in certain protocols (like TCP). To make it clear: in Reactive Streams there is no way for a Publisher to know when a certain element has been processed, there is no acknowledgement signal.

    ReplyDelete
  7. "Maybe my first sentence was a bit harsh, thanks for seeing past it!"

    being right > raging + being right > being wrong > raging + being wrong

    1) fanout. Its gets problematic in a network when using multicast messaging. Sharding/partitioning is not always applicable. For a single server system its probably unimportant.
    2) Hm .. I'd try to approach solutions which do not depend on latency. One of the major advantages of async actors is their resilence regarding latency in my opinion. Packet loss is an issue in WAN's only, but this can be solved at transport level, not in the application layer. Let's disagree here :-)
    3) Agree PnP requires p2p flow control. However in my experience in practice effort/reward is still better for handcrafted global throttling, as large p2p actor networks tend to show nasty undeterministic effects (resonance, corner cases in load). We'll see. You are somewhat transforming a "push model" into a "pull model", better: its a mixture of both. Anyway trying to solve the flow control problem in general is a valuable effort, though I am not convinced this one will be successful.
    4) Yep. There have been other efforts solving the backpressure problem of async messaging using bounded blocking queues. A global supervisor then modifies queue sizes to ressolve the deadlock. e.g. http://www.google.com/patents/US8595391. What's nice with this kind of approach is the fact, that no "backtalk" is going on.[btw: strange idea to patent resizing queues ...]
    7) You misunderstood me, I am not talking thread-per-connection/request blocking style. I am talking of bounded queues which block the calling actor (not Thread!) when reaching the Q limit. This is a quite effective way to implement single-machine backpressure. The downside is, that one can get Deadlocks this way. To avoid deadlocks, one would have to introduce non-determinism regarding message processing order, which is a no-go imo. These constraints have been proven formally (so its not my personal invention or something :-) ).
    9) see 7. I am talking the fundamental actor dilemma:

    Either accept

    * OOM with unbounded queues or
    * accept deadlocks with bounded blocking queues or
    * accept non-determinism in message processing.

    this can be (has been) formally proved.

    10) Hm .. it will be interesting how this turns out for real-world large systems. I understand your intention to reduce queue sizes, on the other hand just using offer() semantics on bounded queues would achive similar results without the backtalking. What somehow looks strange to me is the fact, that the load coming into a system is not "pulled". Server Applications do not signal a client browser "you are allowed to send a request now". So at least at the inbound side of your system you need kind of a queue to buffer input messages (e.g. requests) and you will need to block (reject) incoming requests once this queue reaches a certain size. This also applies to modern stream oriented (big) data sources. Usually these just blast out query results as fast as possible to reduce memory requirements and consistency/locking issues. So you are employing a pull model in a "push" world.
    My gut tells me, intra-thread, intra-core, intra-socket, intra-machine, intra-system messaging should be self-similar .. anyway intuition can be arbitrary wrong in IT ;-)

    ReplyDelete
  8. The dilemma that you mention is indeed well-known: in the absence of a silver bullet everybody will have to pick their poison. With Actors we already have support for “OOME with unbounded queues” and “non-determinism in message processing”, so Reactive Streams provide the missing third option. I am also curious to see how this evolves when used in real-world large applications, but the feedback I currently get is very encouraging that we are on the right track.

    ReplyDelete
  9. Hi Rüdiger,
    I read your article and have an impression if there is a slow receiver, the send rate will be limited by this bad guy.
    Is this a problem?

    ReplyDelete
    Replies
    1. Well, that's the point of backpressure: reduce send rate in case a receiver cannot keep up. Of course there is a lower limit to send rate. Once a sender receives backpressure "NAK" messages and already is at the lowest allowed send rate, the NAK will be ignored. So its accepts slow receivers to a certain limit. This could be extended adding a time interval (e.g. accept very low send rate for say 100ms only).

      Delete
  10. Currently implementing ReactiveStreams Spec for kontraktor actors .. works like a charm even for remote streams (higher latency can be an issue with requestNext()). Well thought, my concerns were probably wrong (except for multicast, however this is kind of a niche).

    Good Work !

    ReplyDelete