StreamBase application design patterns to minimize queuing

StreamBase application design patterns to minimize queuing

book

Article ID: KB0079975

calendar_today

Updated On:

Products Versions
TIBCO Streaming -

Description

My application has data input rates which are frequently faster than the application can process, leading to queuing and growing latency. How can I avoid queuing? 

Issue/Introduction

Design guidance

Resolution

Measure latency, not queuing...

First, do not measure queuing, but measure latency instead. When latency is low then you know there is no queuing. Latency is much easier to measure. To do this store a nanotime() result in the message tuple immediately after input and compare that to the nanotime() result after that tuple has been "fully processed" and is about to be dropped or emitted. In a test environment determine the minimum latency when the application is NOT under stress. Determine a threshold for when latency is too high as a time to take action. Watching the queue statistics stream and reacting to higher internal queue values would be our last choice as this is a lagging report.

There are several design options:
  • Add parallelism to use more CPU and speed up the application.
  • Throttle the input to a slower input rate.
  • Use a Query Table cache to prioritize and drop unneeded updates.

Best practice (higher CPU demand)...

The preferred solution is to make sure the consumer is always faster than the producer, or in general, that all downstream processing has a higher throughput capacity than upstream. When this is true, there is no persistent queuing and related problems. Increase the speed of the consumer by adding more operations in parallel and in their own threads (StreamBase Properties, Concurrency tab, "Run this component in a parallel region" setting; best applied to Module operators to group single flows) so the downstream processes have access to more CPU capacity. Note that if the machine is already fully utilized, then there is no more CPU available and this will not work.

Simple throttling (higher latencies)...

Most adapters will attempt to consume new messages from external providers as fast as possible while connected or subscribed. The simplest approach to matching input speed with slower processing is to disconnect and reconnect the adapters at intervals, putting back-pressure on the external provider and allowing the slower processes to catch up. Unless the external system drops messages, this will lead to higher latency. 

Using a Query Table cache (filtered data)...

An alternative to use when data inputs are frequently too fast for the system to handle because of too little CPU is to drop some of the input data, but to do so intelligently. The approach is to write all messages into a Query Table and then emit oldest-to-newest like a queue at a rate only as fast as the consumer can handle. Dropping data is handled by updating a row if the same index value is seen again before the row has been emitted and deleted (this is an atomic operation when using a Query Delete operator that emits the deleted "old" rows). The update is what does the work to keep the most important values such as most recent values, minimums, maximums, and sums (when updating a record do not change it's oldest-to-newest position in the set of messages). Use a Metronome to Delete and report rows from the table only as fast as the downstream consumer can handle, preventing any queuing from occurring. This can be further refined to assign greater priority to high-value messages or trigger extra reads if downstream capacity improves.

Fully reactive rate throttling...

In the case where a consistent read rate is unacceptable because of changing system resources (the downstream application has periods of extreme slowness due to short-term demands on CPU), there is an additional option.

Since a Metronome's rate cannot be changed, instead the Metronome should be set to a fast tick and followed by a Sequence operator and Filter with expression " sequence_number%GOVERNOR=0" where GOVERNOR is a Dynamic Variable. If the normally correct query speed is 1/10th of a second (100ms), set the Metronome to tick every 10ms and the GOVERNOR to '10', letting through every 10th tick. When latency increases, add some offset to the GOVERNOR to slow down queries. When latency is low again, reduce the GOVERNOR back to the default level. This gives you dynamic control directly related to performance conditions.

Here is an example of the Queue pattern with a governor:
EventFlow pattern for using a priority cache.

This pattern assumes that the Producer.sbapp and Consumer.sbapp are in separate parallel modules (to let them run independently without blocking) and that the "Cache" Query Table is shared and used by both. The "Delete" Query uses a secondary index on timestamp to report oldest to newest with a limit of one tuple at a time. The "Writer" only writes the timestamp (secondary index) on insert and leaves it alone on update (while updating other values). This pattern may be simplified if not using the GOVERNOR to adjust the effective Metronome rate.

How much and how frequently to change the GOVERNOR value will take some experimenting. Adjust the GOVERNOR incrementally to "catch up" to current conditions (if the measured latency is too high, add +1 to the governor value; if low, subtract 1 down to the base level) instead of trying to force the rate high or low based on rapidly changing performance conditions to avoid hysteresis (the governor value changing rapidly between two extremes).

Summary...

Use a Query Table as a cache to adjust the rate data is sent to slower consumers by measuring and responding to latency. This has the benefit of keeping queues low (avoiding memory issues) and latency as low as the system resources will allow while processing the highest quality data. When the consumer is as fast as data inputs, the brief pause in the Query Table is negligible. The worst case memory use is capped at the number of uniquely indexed records which might be stored at the same time in the Query Table.